reth_provider/providers/state/
overlay.rs1use alloy_eips::BlockNumHash;
2use alloy_primitives::{BlockHash, BlockNumber, B256};
3use metrics::{Counter, Histogram};
4use reth_chain_state::{EthPrimitives, LazyOverlay};
5use reth_db_api::{tables, transaction::DbTx, DatabaseError};
6use reth_errors::{ProviderError, ProviderResult};
7use reth_metrics::Metrics;
8use reth_primitives_traits::{
9 dashmap::{self, DashMap},
10 NodePrimitives,
11};
12use reth_prune_types::PruneSegment;
13use reth_stages_types::StageId;
14use reth_storage_api::{
15 BlockNumReader, ChangeSetReader, DBProvider, DatabaseProviderFactory,
16 DatabaseProviderROFactory, PruneCheckpointReader, StageCheckpointReader,
17 StorageChangeSetReader, StorageSettingsCache,
18};
19use reth_trie::{
20 hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
21 trie_cursor::{InMemoryTrieCursor, TrieCursor, TrieCursorFactory, TrieStorageCursor},
22 updates::TrieUpdatesSorted,
23 HashedPostStateSorted,
24};
25use reth_trie_db::{
26 ChangesetCache, DatabaseAccountTrieCursor, DatabaseHashedCursorFactory,
27 DatabaseStorageTrieCursor, LegacyKeyAdapter, PackedAccountsTrie, PackedKeyAdapter,
28 PackedStoragesTrie,
29};
30use std::{
31 ops::RangeInclusive,
32 sync::Arc,
33 time::{Duration, Instant},
34};
35use tracing::{debug, debug_span, instrument};
36
37#[derive(Clone, Metrics)]
39#[metrics(scope = "storage.providers.overlay")]
40pub(crate) struct OverlayStateProviderMetrics {
41 create_provider_duration: Histogram,
43 retrieve_trie_reverts_duration: Histogram,
45 retrieve_hashed_state_reverts_duration: Histogram,
47 trie_updates_size: Histogram,
49 hashed_state_size: Histogram,
51 database_provider_ro_duration: Histogram,
53 overlay_cache_misses: Counter,
55}
56
57#[derive(Debug, Clone)]
59pub(super) struct Overlay {
60 pub(super) trie_updates: Arc<TrieUpdatesSorted>,
61 pub(super) hashed_post_state: Arc<HashedPostStateSorted>,
62}
63
64#[derive(Debug, Clone)]
69pub(super) enum OverlaySource<N: NodePrimitives = EthPrimitives> {
70 Immediate {
72 trie: Arc<TrieUpdatesSorted>,
74 state: Arc<HashedPostStateSorted>,
76 },
77 Lazy(LazyOverlay<N>),
79}
80
81#[derive(Debug, Clone)]
86pub struct OverlayBuilder<N: NodePrimitives = EthPrimitives> {
87 anchor_hash: B256,
89 overlay_source: Option<OverlaySource<N>>,
91 changeset_cache: ChangesetCache,
93 metrics: OverlayStateProviderMetrics,
95}
96
97impl<N: NodePrimitives> OverlayBuilder<N> {
98 pub fn new(anchor_hash: B256, changeset_cache: ChangesetCache) -> Self {
100 Self {
101 anchor_hash,
102 overlay_source: None,
103 changeset_cache,
104 metrics: OverlayStateProviderMetrics::default(),
105 }
106 }
107
108 pub(super) fn with_overlay_source(mut self, source: Option<OverlaySource<N>>) -> Self {
112 if let Some(OverlaySource::Lazy(lazy_overlay)) = source.as_ref() {
113 self.assert_lazy_overlay_anchor(lazy_overlay);
114 }
115 self.overlay_source = source;
116 self
117 }
118
119 fn assert_lazy_overlay_anchor(&self, lazy_overlay: &LazyOverlay<N>) {
120 let Some(lazy_overlay_anchor) = lazy_overlay.anchor_hash() else { return };
121 assert!(
122 lazy_overlay_anchor == self.anchor_hash,
123 "LazyOverlay's anchor ({}) != OverlayBuilder's anchor ({})",
124 lazy_overlay_anchor,
125 self.anchor_hash,
126 );
127 }
128
129 pub fn with_lazy_overlay(mut self, lazy_overlay: Option<LazyOverlay<N>>) -> Self {
133 if let Some(lazy_overlay) = lazy_overlay.as_ref() {
134 self.assert_lazy_overlay_anchor(lazy_overlay);
135 }
136 self.overlay_source = lazy_overlay.map(OverlaySource::Lazy);
137 self
138 }
139
140 pub fn with_hashed_state_overlay(
142 mut self,
143 hashed_state_overlay: Option<Arc<HashedPostStateSorted>>,
144 ) -> Self {
145 if let Some(state) = hashed_state_overlay {
146 self.overlay_source = Some(OverlaySource::Immediate {
147 trie: Arc::new(TrieUpdatesSorted::default()),
148 state,
149 });
150 }
151 self
152 }
153
154 pub fn with_extended_hashed_state_overlay(mut self, other: HashedPostStateSorted) -> Self {
159 match &mut self.overlay_source {
160 Some(OverlaySource::Immediate { state, .. }) => {
161 Arc::make_mut(state).extend_ref_and_sort(&other);
162 }
163 Some(OverlaySource::Lazy(overlay)) => {
164 let (trie, mut state) = overlay.as_overlay(self.anchor_hash);
166 Arc::make_mut(&mut state).extend_ref_and_sort(&other);
167 self.overlay_source = Some(OverlaySource::Immediate { trie, state });
168 }
169 None => {
170 self.overlay_source = Some(OverlaySource::Immediate {
171 trie: Arc::new(TrieUpdatesSorted::default()),
172 state: Arc::new(other),
173 });
174 }
175 }
176 self
177 }
178
179 fn resolve_overlays(
184 &self,
185 anchor_hash: BlockHash,
186 ) -> ProviderResult<(Arc<TrieUpdatesSorted>, Arc<HashedPostStateSorted>)> {
187 match &self.overlay_source {
188 Some(OverlaySource::Lazy(lazy_overlay)) => Ok(lazy_overlay.as_overlay(anchor_hash)),
189 Some(OverlaySource::Immediate { trie, state }) => {
190 if anchor_hash != self.anchor_hash {
191 return Err(ProviderError::other(std::io::Error::other(format!(
192 "anchor_hash {anchor_hash} doesn't match OverlayBuilder's configured anchor ({})",
193 self.anchor_hash
194 ))))
195 }
196 Ok((Arc::clone(trie), Arc::clone(state)))
197 }
198 None => Ok((
199 Arc::new(TrieUpdatesSorted::default()),
200 Arc::new(HashedPostStateSorted::default()),
201 )),
202 }
203 }
204
205 fn get_block_number<Provider>(&self, provider: &Provider) -> ProviderResult<BlockNumber>
207 where
208 Provider: BlockNumReader,
209 {
210 provider
211 .convert_hash_or_number(self.anchor_hash.into())?
212 .ok_or(ProviderError::BlockHashNotFound(self.anchor_hash))
213 }
214
215 fn get_db_tip_block<Provider>(&self, provider: &Provider) -> ProviderResult<BlockNumHash>
218 where
219 Provider: StageCheckpointReader + BlockNumReader,
220 {
221 let block_number = provider
222 .get_stage_checkpoint(StageId::Finish)?
223 .as_ref()
224 .map(|chk| chk.block_number)
225 .ok_or_else(|| ProviderError::InsufficientChangesets {
226 requested: 0,
227 available: 0..=0,
228 })?;
229 let hash = provider
230 .convert_number(block_number.into())?
231 .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
232 Ok(BlockNumHash::new(block_number, hash))
233 }
234
235 fn reverts_required<Provider>(
241 &self,
242 provider: &Provider,
243 db_tip_block: BlockNumHash,
244 ) -> ProviderResult<Option<RangeInclusive<BlockNumber>>>
245 where
246 Provider: BlockNumReader + PruneCheckpointReader,
247 {
248 if db_tip_block.hash == self.anchor_hash {
250 return Ok(None)
251 }
252
253 if let Some(OverlaySource::Lazy(lazy_overlay)) = &self.overlay_source &&
257 lazy_overlay.has_anchor_hash(db_tip_block.hash)
258 {
259 return Ok(None)
260 }
261
262 let anchor_number = self.get_block_number(provider)?;
263
264 let prune_checkpoint = provider.get_prune_checkpoint(PruneSegment::AccountHistory)?;
268 let lower_bound = prune_checkpoint
269 .and_then(|chk| chk.block_number)
270 .map(|block_number| block_number + 1)
271 .unwrap_or_default();
272
273 let available_range = lower_bound..=db_tip_block.number;
274
275 if !available_range.contains(&anchor_number) {
277 return Err(ProviderError::InsufficientChangesets {
278 requested: anchor_number,
279 available: available_range,
280 });
281 }
282
283 Ok(Some(anchor_number + 1..=db_tip_block.number))
284 }
285
286 #[instrument(
288 level = "debug",
289 target = "providers::state::overlay",
290 skip_all,
291 fields(?db_tip_block, anchor_hash = ?self.anchor_hash)
292 )]
293 fn calculate_overlay<Provider>(
294 &self,
295 provider: &Provider,
296 db_tip_block: BlockNumHash,
297 ) -> ProviderResult<Overlay>
298 where
299 Provider: ChangeSetReader
300 + StorageChangeSetReader
301 + DBProvider
302 + BlockNumReader
303 + StageCheckpointReader
304 + PruneCheckpointReader
305 + StorageSettingsCache,
306 {
307 let retrieve_trie_reverts_duration;
311 let retrieve_hashed_state_reverts_duration;
312 let trie_updates_total_len;
313 let hashed_state_updates_total_len;
314
315 let (trie_updates, hashed_post_state) = if let Some(revert_blocks) =
317 self.reverts_required(provider, db_tip_block)?
318 {
319 debug!(
320 target: "providers::state::overlay",
321 ?revert_blocks,
322 "Collecting trie reverts for overlay state provider"
323 );
324
325 let mut trie_reverts = {
327 let _guard =
328 debug_span!(target: "providers::state::overlay", "retrieving_trie_reverts")
329 .entered();
330
331 let start = Instant::now();
332
333 let accumulated_reverts =
336 self.changeset_cache.get_or_compute_range(provider, revert_blocks.clone())?;
337
338 retrieve_trie_reverts_duration = start.elapsed();
339 accumulated_reverts
340 };
341
342 let mut hashed_state_reverts = {
344 let _guard = debug_span!(target: "providers::state::overlay", "retrieving_hashed_state_reverts").entered();
345
346 let start = Instant::now();
347 let res = reth_trie_db::from_reverts_auto(provider, revert_blocks)?;
348 retrieve_hashed_state_reverts_duration = start.elapsed();
349 res
350 };
351
352 let (overlay_trie, overlay_state) = self.resolve_overlays(self.anchor_hash)?;
355
356 let trie_updates = if trie_reverts.is_empty() {
357 overlay_trie
358 } else if !overlay_trie.is_empty() {
359 trie_reverts.extend_ref_and_sort(&overlay_trie);
360 Arc::new(trie_reverts)
361 } else {
362 Arc::new(trie_reverts)
363 };
364
365 let hashed_state_updates = if hashed_state_reverts.is_empty() {
366 overlay_state
367 } else if !overlay_state.is_empty() {
368 hashed_state_reverts.extend_ref_and_sort(&overlay_state);
369 Arc::new(hashed_state_reverts)
370 } else {
371 Arc::new(hashed_state_reverts)
372 };
373
374 trie_updates_total_len = trie_updates.total_len();
375 hashed_state_updates_total_len = hashed_state_updates.total_len();
376
377 debug!(
378 target: "providers::state::overlay",
379 num_trie_updates = ?trie_updates_total_len,
380 num_state_updates = ?hashed_state_updates_total_len,
381 "Reverted to target block",
382 );
383
384 (trie_updates, hashed_state_updates)
385 } else {
386 let (trie_updates, hashed_state) = self.resolve_overlays(db_tip_block.hash)?;
389
390 retrieve_trie_reverts_duration = Duration::ZERO;
391 retrieve_hashed_state_reverts_duration = Duration::ZERO;
392 trie_updates_total_len = trie_updates.total_len();
393 hashed_state_updates_total_len = hashed_state.total_len();
394
395 (trie_updates, hashed_state)
396 };
397
398 self.metrics
400 .retrieve_trie_reverts_duration
401 .record(retrieve_trie_reverts_duration.as_secs_f64());
402 self.metrics
403 .retrieve_hashed_state_reverts_duration
404 .record(retrieve_hashed_state_reverts_duration.as_secs_f64());
405 self.metrics.trie_updates_size.record(trie_updates_total_len as f64);
406 self.metrics.hashed_state_size.record(hashed_state_updates_total_len as f64);
407
408 Ok(Overlay { trie_updates, hashed_post_state })
409 }
410
411 #[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
413 pub(super) fn build_overlay<Provider>(&self, provider: &Provider) -> ProviderResult<Overlay>
414 where
415 Provider: StageCheckpointReader
416 + PruneCheckpointReader
417 + ChangeSetReader
418 + StorageChangeSetReader
419 + DBProvider
420 + BlockNumReader
421 + StorageSettingsCache,
422 {
423 let db_tip_block = self.get_db_tip_block(provider)?;
424 self.calculate_overlay(provider, db_tip_block)
425 }
426}
427
428#[derive(Debug, Clone)]
433pub struct OverlayStateProviderFactory<F, N: NodePrimitives = EthPrimitives> {
434 factory: F,
436 overlay_builder: OverlayBuilder<N>,
438 overlay_cache: Arc<DashMap<BlockHash, Overlay>>,
441}
442
443impl<F, N: NodePrimitives> OverlayStateProviderFactory<F, N> {
444 pub fn new(factory: F, overlay_builder: OverlayBuilder<N>) -> Self {
446 Self { factory, overlay_builder, overlay_cache: Default::default() }
447 }
448
449 pub fn with_lazy_overlay(mut self, lazy_overlay: Option<LazyOverlay<N>>) -> Self {
451 self.overlay_builder = self.overlay_builder.with_lazy_overlay(lazy_overlay);
452 self.overlay_cache = Default::default();
453 self
454 }
455
456 pub fn with_hashed_state_overlay(
458 mut self,
459 hashed_state_overlay: Option<Arc<HashedPostStateSorted>>,
460 ) -> Self {
461 self.overlay_builder = self.overlay_builder.with_hashed_state_overlay(hashed_state_overlay);
462 self.overlay_cache = Default::default();
463 self
464 }
465
466 pub fn with_extended_hashed_state_overlay(mut self, other: HashedPostStateSorted) -> Self {
468 self.overlay_builder = self.overlay_builder.with_extended_hashed_state_overlay(other);
469 self.overlay_cache = Default::default();
470 self
471 }
472
473 #[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
476 fn get_overlay<Provider>(&self, provider: &Provider) -> ProviderResult<Overlay>
477 where
478 Provider: StageCheckpointReader
479 + PruneCheckpointReader
480 + ChangeSetReader
481 + StorageChangeSetReader
482 + DBProvider
483 + BlockNumReader
484 + StorageSettingsCache,
485 {
486 let db_tip_block = self.overlay_builder.get_db_tip_block(provider)?;
487
488 let overlay = match self.overlay_cache.entry(db_tip_block.hash) {
489 dashmap::Entry::Occupied(entry) => entry.get().clone(),
490 dashmap::Entry::Vacant(entry) => {
491 self.overlay_builder.metrics.overlay_cache_misses.increment(1);
492 let overlay = self.overlay_builder.build_overlay(provider)?;
493 entry.insert(overlay.clone());
494 overlay
495 }
496 };
497
498 Ok(overlay)
499 }
500}
501
502impl<F, N> DatabaseProviderROFactory for OverlayStateProviderFactory<F, N>
503where
504 N: NodePrimitives,
505 F: DatabaseProviderFactory,
506 F::Provider: StageCheckpointReader
507 + PruneCheckpointReader
508 + DBProvider
509 + BlockNumReader
510 + ChangeSetReader
511 + StorageChangeSetReader
512 + StorageSettingsCache,
513{
514 type Provider = OverlayStateProvider<F::Provider>;
515
516 #[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
518 fn database_provider_ro(&self) -> ProviderResult<OverlayStateProvider<F::Provider>> {
519 let overall_start = Instant::now();
520
521 let provider = {
523 let start = Instant::now();
524 let res = self.factory.database_provider_ro()?;
525 self.overlay_builder.metrics.create_provider_duration.record(start.elapsed());
526 res
527 };
528
529 let Overlay { trie_updates, hashed_post_state } = self.get_overlay(&provider)?;
530
531 let is_v2 = provider.cached_storage_settings().is_v2();
532 self.overlay_builder.metrics.database_provider_ro_duration.record(overall_start.elapsed());
533 Ok(OverlayStateProvider::new(provider, trie_updates, hashed_post_state, is_v2))
534 }
535}
536
537#[derive(Debug)]
543pub struct OverlayStateProvider<Provider: DBProvider> {
544 provider: Provider,
545 trie_updates: Arc<TrieUpdatesSorted>,
546 hashed_post_state: Arc<HashedPostStateSorted>,
547 is_v2: bool,
548}
549
550impl<Provider> OverlayStateProvider<Provider>
551where
552 Provider: DBProvider,
553{
554 pub const fn new(
557 provider: Provider,
558 trie_updates: Arc<TrieUpdatesSorted>,
559 hashed_post_state: Arc<HashedPostStateSorted>,
560 is_v2: bool,
561 ) -> Self {
562 Self { provider, trie_updates, hashed_post_state, is_v2 }
563 }
564}
565
566impl<Provider> TrieCursorFactory for OverlayStateProvider<Provider>
567where
568 Provider: DBProvider,
569 Provider::Tx: DbTx,
570{
571 type AccountTrieCursor<'a>
572 = InMemoryTrieCursor<'a, Box<dyn TrieCursor + Send + 'a>>
573 where
574 Self: 'a;
575
576 type StorageTrieCursor<'a>
577 = InMemoryTrieCursor<'a, Box<dyn TrieStorageCursor + Send + 'a>>
578 where
579 Self: 'a;
580
581 fn account_trie_cursor(&self) -> Result<Self::AccountTrieCursor<'_>, DatabaseError> {
582 let tx = self.provider.tx_ref();
583 let trie_updates = self.trie_updates.as_ref();
584 let cursor: Box<dyn TrieCursor + Send> = if self.is_v2 {
585 Box::new(DatabaseAccountTrieCursor::<_, PackedKeyAdapter>::new(
586 tx.cursor_read::<PackedAccountsTrie>()?,
587 ))
588 } else {
589 Box::new(DatabaseAccountTrieCursor::<_, LegacyKeyAdapter>::new(
590 tx.cursor_read::<tables::AccountsTrie>()?,
591 ))
592 };
593 Ok(InMemoryTrieCursor::new_account(cursor, trie_updates))
594 }
595
596 fn storage_trie_cursor(
597 &self,
598 hashed_address: B256,
599 ) -> Result<Self::StorageTrieCursor<'_>, DatabaseError> {
600 let tx = self.provider.tx_ref();
601 let trie_updates = self.trie_updates.as_ref();
602 let cursor: Box<dyn TrieStorageCursor + Send> = if self.is_v2 {
603 Box::new(DatabaseStorageTrieCursor::<_, PackedKeyAdapter>::new(
604 tx.cursor_dup_read::<PackedStoragesTrie>()?,
605 hashed_address,
606 ))
607 } else {
608 Box::new(DatabaseStorageTrieCursor::<_, LegacyKeyAdapter>::new(
609 tx.cursor_dup_read::<tables::StoragesTrie>()?,
610 hashed_address,
611 ))
612 };
613 Ok(InMemoryTrieCursor::new_storage(cursor, trie_updates, hashed_address))
614 }
615}
616
617impl<Provider> HashedCursorFactory for OverlayStateProvider<Provider>
618where
619 Provider: DBProvider,
620{
621 type AccountCursor<'a>
622 = <HashedPostStateCursorFactory<
623 DatabaseHashedCursorFactory<&'a Provider::Tx>,
624 &'a Arc<HashedPostStateSorted>,
625 > as HashedCursorFactory>::AccountCursor<'a>
626 where
627 Self: 'a;
628
629 type StorageCursor<'a>
630 = <HashedPostStateCursorFactory<
631 DatabaseHashedCursorFactory<&'a Provider::Tx>,
632 &'a Arc<HashedPostStateSorted>,
633 > as HashedCursorFactory>::StorageCursor<'a>
634 where
635 Self: 'a;
636
637 fn hashed_account_cursor(&self) -> Result<Self::AccountCursor<'_>, DatabaseError> {
638 let db_hashed_cursor_factory = DatabaseHashedCursorFactory::new(self.provider.tx_ref());
639 let hashed_cursor_factory =
640 HashedPostStateCursorFactory::new(db_hashed_cursor_factory, &self.hashed_post_state);
641 hashed_cursor_factory.hashed_account_cursor()
642 }
643
644 fn hashed_storage_cursor(
645 &self,
646 hashed_address: B256,
647 ) -> Result<Self::StorageCursor<'_>, DatabaseError> {
648 let db_hashed_cursor_factory = DatabaseHashedCursorFactory::new(self.provider.tx_ref());
649 let hashed_cursor_factory =
650 HashedPostStateCursorFactory::new(db_hashed_cursor_factory, &self.hashed_post_state);
651 hashed_cursor_factory.hashed_storage_cursor(hashed_address)
652 }
653}