reth_provider/providers/state/
overlay.rs1use alloy_primitives::{BlockNumber, B256};
2use metrics::{Counter, Histogram};
3use reth_chain_state::LazyOverlay;
4use reth_db_api::{tables, transaction::DbTx, DatabaseError};
5use reth_errors::{ProviderError, ProviderResult};
6use reth_metrics::Metrics;
7use reth_primitives_traits::dashmap::{self, DashMap};
8use reth_prune_types::PruneSegment;
9use reth_stages_types::StageId;
10use reth_storage_api::{
11 BlockNumReader, ChangeSetReader, DBProvider, DatabaseProviderFactory,
12 DatabaseProviderROFactory, PruneCheckpointReader, StageCheckpointReader,
13 StorageChangeSetReader, StorageSettingsCache,
14};
15use reth_trie::{
16 hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
17 trie_cursor::{InMemoryTrieCursor, TrieCursor, TrieCursorFactory, TrieStorageCursor},
18 updates::TrieUpdatesSorted,
19 HashedPostStateSorted,
20};
21use reth_trie_db::{
22 ChangesetCache, DatabaseAccountTrieCursor, DatabaseHashedCursorFactory,
23 DatabaseStorageTrieCursor, LegacyKeyAdapter, PackedAccountsTrie, PackedKeyAdapter,
24 PackedStoragesTrie,
25};
26use std::{
27 sync::Arc,
28 time::{Duration, Instant},
29};
30use tracing::{debug, debug_span, instrument};
31
32#[derive(Clone, Metrics)]
34#[metrics(scope = "storage.providers.overlay")]
35pub(crate) struct OverlayStateProviderMetrics {
36 create_provider_duration: Histogram,
38 retrieve_trie_reverts_duration: Histogram,
40 retrieve_hashed_state_reverts_duration: Histogram,
42 trie_updates_size: Histogram,
44 hashed_state_size: Histogram,
46 database_provider_ro_duration: Histogram,
48 overlay_cache_misses: Counter,
50}
51
52#[derive(Debug, Clone)]
54struct Overlay {
55 trie_updates: Arc<TrieUpdatesSorted>,
56 hashed_post_state: Arc<HashedPostStateSorted>,
57}
58
59#[derive(Debug, Clone)]
64pub enum OverlaySource {
65 Immediate {
67 trie: Arc<TrieUpdatesSorted>,
69 state: Arc<HashedPostStateSorted>,
71 },
72 Lazy(LazyOverlay),
74}
75
76impl OverlaySource {
77 fn resolve(&self) -> (Arc<TrieUpdatesSorted>, Arc<HashedPostStateSorted>) {
81 match self {
82 Self::Immediate { trie, state } => (Arc::clone(trie), Arc::clone(state)),
83 Self::Lazy(lazy) => lazy.as_overlay(),
84 }
85 }
86}
87
88#[derive(Debug, Clone)]
93pub struct OverlayStateProviderFactory<F> {
94 factory: F,
96 block_hash: Option<B256>,
98 overlay_source: Option<OverlaySource>,
100 changeset_cache: ChangesetCache,
102 metrics: OverlayStateProviderMetrics,
104 overlay_cache: Arc<DashMap<BlockNumber, Overlay>>,
107}
108
109impl<F> OverlayStateProviderFactory<F> {
110 pub fn new(factory: F, changeset_cache: ChangesetCache) -> Self {
112 Self {
113 factory,
114 block_hash: None,
115 overlay_source: None,
116 changeset_cache,
117 metrics: OverlayStateProviderMetrics::default(),
118 overlay_cache: Default::default(),
119 }
120 }
121
122 pub const fn with_block_hash(mut self, block_hash: Option<B256>) -> Self {
125 self.block_hash = block_hash;
126 self
127 }
128
129 pub fn with_overlay_source(mut self, source: Option<OverlaySource>) -> Self {
133 self.overlay_source = source;
134 self.overlay_cache = Default::default();
136 self
137 }
138
139 pub fn with_lazy_overlay(mut self, lazy_overlay: Option<LazyOverlay>) -> Self {
143 self.overlay_source = lazy_overlay.map(OverlaySource::Lazy);
144 self.overlay_cache = Default::default();
146 self
147 }
148
149 pub fn with_hashed_state_overlay(
153 mut self,
154 hashed_state_overlay: Option<Arc<HashedPostStateSorted>>,
155 ) -> Self {
156 if let Some(state) = hashed_state_overlay {
157 self.overlay_source = Some(OverlaySource::Immediate {
158 trie: Arc::new(TrieUpdatesSorted::default()),
159 state,
160 });
161 self.overlay_cache = Default::default();
163 }
164 self
165 }
166
167 pub fn with_extended_hashed_state_overlay(mut self, other: HashedPostStateSorted) -> Self {
172 match &mut self.overlay_source {
173 Some(OverlaySource::Immediate { state, .. }) => {
174 Arc::make_mut(state).extend_ref_and_sort(&other);
175 }
176 Some(OverlaySource::Lazy(lazy)) => {
177 let (trie, mut state) = lazy.as_overlay();
179 Arc::make_mut(&mut state).extend_ref_and_sort(&other);
180 self.overlay_source = Some(OverlaySource::Immediate { trie, state });
181 }
182 None => {
183 self.overlay_source = Some(OverlaySource::Immediate {
184 trie: Arc::new(TrieUpdatesSorted::default()),
185 state: Arc::new(other),
186 });
187 }
188 }
189 self.overlay_cache = Default::default();
191 self
192 }
193}
194
195impl<F> OverlayStateProviderFactory<F>
196where
197 F: DatabaseProviderFactory,
198 F::Provider: StageCheckpointReader
199 + PruneCheckpointReader
200 + ChangeSetReader
201 + StorageChangeSetReader
202 + DBProvider
203 + BlockNumReader
204 + StorageSettingsCache,
205{
206 fn resolve_overlays(&self) -> (Arc<TrieUpdatesSorted>, Arc<HashedPostStateSorted>) {
211 match &self.overlay_source {
212 Some(source) => source.resolve(),
213 None => {
214 (Arc::new(TrieUpdatesSorted::default()), Arc::new(HashedPostStateSorted::default()))
215 }
216 }
217 }
218
219 fn get_requested_block_number(
221 &self,
222 provider: &F::Provider,
223 ) -> ProviderResult<Option<BlockNumber>> {
224 if let Some(block_hash) = self.block_hash {
225 Ok(Some(
226 provider
227 .convert_hash_or_number(block_hash.into())?
228 .ok_or_else(|| ProviderError::BlockHashNotFound(block_hash))?,
229 ))
230 } else {
231 Ok(None)
232 }
233 }
234
235 fn get_db_tip_block_number(&self, provider: &F::Provider) -> ProviderResult<BlockNumber> {
238 provider
239 .get_stage_checkpoint(StageId::Finish)?
240 .as_ref()
241 .map(|chk| chk.block_number)
242 .ok_or_else(|| ProviderError::InsufficientChangesets { requested: 0, available: 0..=0 })
243 }
244
245 fn reverts_required(
251 &self,
252 provider: &F::Provider,
253 db_tip_block: BlockNumber,
254 requested_block: BlockNumber,
255 ) -> ProviderResult<bool> {
256 if db_tip_block == requested_block {
259 return Ok(false)
260 }
261
262 let prune_checkpoint = provider.get_prune_checkpoint(PruneSegment::AccountHistory)?;
266 let lower_bound = prune_checkpoint
267 .and_then(|chk| chk.block_number)
268 .map(|block_number| block_number + 1)
269 .unwrap_or_default();
270
271 let available_range = lower_bound..=db_tip_block;
272
273 if !available_range.contains(&requested_block) {
275 return Err(ProviderError::InsufficientChangesets {
276 requested: requested_block,
277 available: available_range,
278 });
279 }
280
281 Ok(true)
282 }
283
284 #[instrument(
286 level = "debug",
287 target = "providers::state::overlay",
288 skip_all,
289 fields(%db_tip_block)
290 )]
291 fn calculate_overlay(
292 &self,
293 provider: &F::Provider,
294 db_tip_block: BlockNumber,
295 ) -> ProviderResult<Overlay> {
296 let retrieve_trie_reverts_duration;
300 let retrieve_hashed_state_reverts_duration;
301 let trie_updates_total_len;
302 let hashed_state_updates_total_len;
303
304 let (trie_updates, hashed_post_state) = if let Some(from_block) =
306 self.get_requested_block_number(provider)? &&
307 self.reverts_required(provider, db_tip_block, from_block)?
308 {
309 debug!(
310 target: "providers::state::overlay",
311 block_hash = ?self.block_hash,
312 from_block,
313 db_tip_block,
314 range_start = from_block + 1,
315 range_end = db_tip_block,
316 "Collecting trie reverts for overlay state provider"
317 );
318
319 let mut trie_reverts = {
321 let _guard =
322 debug_span!(target: "providers::state::overlay", "retrieving_trie_reverts")
323 .entered();
324
325 let start = Instant::now();
326
327 let accumulated_reverts = self
330 .changeset_cache
331 .get_or_compute_range(provider, (from_block + 1)..=db_tip_block)?;
332
333 retrieve_trie_reverts_duration = start.elapsed();
334 accumulated_reverts
335 };
336
337 let mut hashed_state_reverts = {
339 let _guard = debug_span!(target: "providers::state::overlay", "retrieving_hashed_state_reverts").entered();
340
341 let start = Instant::now();
342 let res = reth_trie_db::from_reverts_auto(provider, from_block + 1..)?;
343 retrieve_hashed_state_reverts_duration = start.elapsed();
344 res
345 };
346
347 let (overlay_trie, overlay_state) = self.resolve_overlays();
350
351 let trie_updates = if trie_reverts.is_empty() {
352 overlay_trie
353 } else if !overlay_trie.is_empty() {
354 trie_reverts.extend_ref_and_sort(&overlay_trie);
355 Arc::new(trie_reverts)
356 } else {
357 Arc::new(trie_reverts)
358 };
359
360 let hashed_state_updates = if hashed_state_reverts.is_empty() {
361 overlay_state
362 } else if !overlay_state.is_empty() {
363 hashed_state_reverts.extend_ref_and_sort(&overlay_state);
364 Arc::new(hashed_state_reverts)
365 } else {
366 Arc::new(hashed_state_reverts)
367 };
368
369 trie_updates_total_len = trie_updates.total_len();
370 hashed_state_updates_total_len = hashed_state_updates.total_len();
371
372 debug!(
373 target: "providers::state::overlay",
374 block_hash = ?self.block_hash,
375 ?from_block,
376 num_trie_updates = ?trie_updates_total_len,
377 num_state_updates = ?hashed_state_updates_total_len,
378 "Reverted to target block",
379 );
380
381 (trie_updates, hashed_state_updates)
382 } else {
383 let (trie_updates, hashed_state) = self.resolve_overlays();
385
386 retrieve_trie_reverts_duration = Duration::ZERO;
387 retrieve_hashed_state_reverts_duration = Duration::ZERO;
388 trie_updates_total_len = trie_updates.total_len();
389 hashed_state_updates_total_len = hashed_state.total_len();
390
391 (trie_updates, hashed_state)
392 };
393
394 self.metrics
396 .retrieve_trie_reverts_duration
397 .record(retrieve_trie_reverts_duration.as_secs_f64());
398 self.metrics
399 .retrieve_hashed_state_reverts_duration
400 .record(retrieve_hashed_state_reverts_duration.as_secs_f64());
401 self.metrics.trie_updates_size.record(trie_updates_total_len as f64);
402 self.metrics.hashed_state_size.record(hashed_state_updates_total_len as f64);
403
404 Ok(Overlay { trie_updates, hashed_post_state })
405 }
406
407 #[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
410 fn get_overlay(&self, provider: &F::Provider) -> ProviderResult<Overlay> {
411 if self.block_hash.is_none() {
414 let (trie_updates, hashed_post_state) = self.resolve_overlays();
415 return Ok(Overlay { trie_updates, hashed_post_state })
416 }
417
418 let db_tip_block = self.get_db_tip_block_number(provider)?;
419
420 if let Some(entry) = self.overlay_cache.get(&db_tip_block) {
422 return Ok(entry.value().clone());
423 }
424
425 let mut cache_miss = false;
428 let overlay = match self.overlay_cache.entry(db_tip_block) {
429 dashmap::Entry::Occupied(entry) => entry.get().clone(),
430 dashmap::Entry::Vacant(entry) => {
431 cache_miss = true;
432 let overlay = self.calculate_overlay(provider, db_tip_block)?;
433 entry.insert(overlay.clone());
434 overlay
435 }
436 };
437
438 if cache_miss {
439 self.metrics.overlay_cache_misses.increment(1);
440 }
441
442 Ok(overlay)
443 }
444}
445
446impl<F> DatabaseProviderROFactory for OverlayStateProviderFactory<F>
447where
448 F: DatabaseProviderFactory,
449 F::Provider: StageCheckpointReader
450 + PruneCheckpointReader
451 + BlockNumReader
452 + ChangeSetReader
453 + StorageChangeSetReader
454 + StorageSettingsCache,
455{
456 type Provider = OverlayStateProvider<F::Provider>;
457
458 #[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
460 fn database_provider_ro(&self) -> ProviderResult<OverlayStateProvider<F::Provider>> {
461 let overall_start = Instant::now();
462
463 let provider = {
465 let start = Instant::now();
466 let res = self.factory.database_provider_ro()?;
467 self.metrics.create_provider_duration.record(start.elapsed());
468 res
469 };
470
471 let Overlay { trie_updates, hashed_post_state } = self.get_overlay(&provider)?;
472
473 let is_v2 = provider.cached_storage_settings().is_v2();
474 self.metrics.database_provider_ro_duration.record(overall_start.elapsed());
475 Ok(OverlayStateProvider::new(provider, trie_updates, hashed_post_state, is_v2))
476 }
477}
478
479#[derive(Debug)]
485pub struct OverlayStateProvider<Provider: DBProvider> {
486 provider: Provider,
487 trie_updates: Arc<TrieUpdatesSorted>,
488 hashed_post_state: Arc<HashedPostStateSorted>,
489 is_v2: bool,
490}
491
492impl<Provider> OverlayStateProvider<Provider>
493where
494 Provider: DBProvider,
495{
496 pub const fn new(
499 provider: Provider,
500 trie_updates: Arc<TrieUpdatesSorted>,
501 hashed_post_state: Arc<HashedPostStateSorted>,
502 is_v2: bool,
503 ) -> Self {
504 Self { provider, trie_updates, hashed_post_state, is_v2 }
505 }
506}
507
508impl<Provider> TrieCursorFactory for OverlayStateProvider<Provider>
509where
510 Provider: DBProvider,
511 Provider::Tx: DbTx,
512{
513 type AccountTrieCursor<'a>
514 = InMemoryTrieCursor<'a, Box<dyn TrieCursor + Send + 'a>>
515 where
516 Self: 'a;
517
518 type StorageTrieCursor<'a>
519 = InMemoryTrieCursor<'a, Box<dyn TrieStorageCursor + Send + 'a>>
520 where
521 Self: 'a;
522
523 fn account_trie_cursor(&self) -> Result<Self::AccountTrieCursor<'_>, DatabaseError> {
524 let tx = self.provider.tx_ref();
525 let trie_updates = self.trie_updates.as_ref();
526 let cursor: Box<dyn TrieCursor + Send> = if self.is_v2 {
527 Box::new(DatabaseAccountTrieCursor::<_, PackedKeyAdapter>::new(
528 tx.cursor_read::<PackedAccountsTrie>()?,
529 ))
530 } else {
531 Box::new(DatabaseAccountTrieCursor::<_, LegacyKeyAdapter>::new(
532 tx.cursor_read::<tables::AccountsTrie>()?,
533 ))
534 };
535 Ok(InMemoryTrieCursor::new_account(cursor, trie_updates))
536 }
537
538 fn storage_trie_cursor(
539 &self,
540 hashed_address: B256,
541 ) -> Result<Self::StorageTrieCursor<'_>, DatabaseError> {
542 let tx = self.provider.tx_ref();
543 let trie_updates = self.trie_updates.as_ref();
544 let cursor: Box<dyn TrieStorageCursor + Send> = if self.is_v2 {
545 Box::new(DatabaseStorageTrieCursor::<_, PackedKeyAdapter>::new(
546 tx.cursor_dup_read::<PackedStoragesTrie>()?,
547 hashed_address,
548 ))
549 } else {
550 Box::new(DatabaseStorageTrieCursor::<_, LegacyKeyAdapter>::new(
551 tx.cursor_dup_read::<tables::StoragesTrie>()?,
552 hashed_address,
553 ))
554 };
555 Ok(InMemoryTrieCursor::new_storage(cursor, trie_updates, hashed_address))
556 }
557}
558
559impl<Provider> HashedCursorFactory for OverlayStateProvider<Provider>
560where
561 Provider: DBProvider,
562{
563 type AccountCursor<'a>
564 = <HashedPostStateCursorFactory<
565 DatabaseHashedCursorFactory<&'a Provider::Tx>,
566 &'a Arc<HashedPostStateSorted>,
567 > as HashedCursorFactory>::AccountCursor<'a>
568 where
569 Self: 'a;
570
571 type StorageCursor<'a>
572 = <HashedPostStateCursorFactory<
573 DatabaseHashedCursorFactory<&'a Provider::Tx>,
574 &'a Arc<HashedPostStateSorted>,
575 > as HashedCursorFactory>::StorageCursor<'a>
576 where
577 Self: 'a;
578
579 fn hashed_account_cursor(&self) -> Result<Self::AccountCursor<'_>, DatabaseError> {
580 let db_hashed_cursor_factory = DatabaseHashedCursorFactory::new(self.provider.tx_ref());
581 let hashed_cursor_factory =
582 HashedPostStateCursorFactory::new(db_hashed_cursor_factory, &self.hashed_post_state);
583 hashed_cursor_factory.hashed_account_cursor()
584 }
585
586 fn hashed_storage_cursor(
587 &self,
588 hashed_address: B256,
589 ) -> Result<Self::StorageCursor<'_>, DatabaseError> {
590 let db_hashed_cursor_factory = DatabaseHashedCursorFactory::new(self.provider.tx_ref());
591 let hashed_cursor_factory =
592 HashedPostStateCursorFactory::new(db_hashed_cursor_factory, &self.hashed_post_state);
593 hashed_cursor_factory.hashed_storage_cursor(hashed_address)
594 }
595}