reth_provider/providers/state/
overlay.rs1use alloy_primitives::{BlockNumber, B256};
2use metrics::{Counter, Histogram};
3use reth_chain_state::LazyOverlay;
4use reth_db_api::DatabaseError;
5use reth_errors::{ProviderError, ProviderResult};
6use reth_metrics::Metrics;
7use reth_primitives_traits::dashmap::{self, DashMap};
8use reth_prune_types::PruneSegment;
9use reth_stages_types::StageId;
10use reth_storage_api::{
11 BlockNumReader, ChangeSetReader, DBProvider, DatabaseProviderFactory,
12 DatabaseProviderROFactory, PruneCheckpointReader, StageCheckpointReader,
13 StorageChangeSetReader, StorageSettingsCache,
14};
15use reth_trie::{
16 hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
17 trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
18 updates::TrieUpdatesSorted,
19 HashedPostStateSorted,
20};
21use reth_trie_db::{ChangesetCache, DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
22use std::{
23 sync::Arc,
24 time::{Duration, Instant},
25};
26use tracing::{debug, debug_span, instrument};
27
28#[derive(Clone, Metrics)]
30#[metrics(scope = "storage.providers.overlay")]
31pub(crate) struct OverlayStateProviderMetrics {
32 create_provider_duration: Histogram,
34 retrieve_trie_reverts_duration: Histogram,
36 retrieve_hashed_state_reverts_duration: Histogram,
38 trie_updates_size: Histogram,
40 hashed_state_size: Histogram,
42 database_provider_ro_duration: Histogram,
44 overlay_cache_misses: Counter,
46}
47
48#[derive(Debug, Clone)]
50struct Overlay {
51 trie_updates: Arc<TrieUpdatesSorted>,
52 hashed_post_state: Arc<HashedPostStateSorted>,
53}
54
55#[derive(Debug, Clone)]
60pub enum OverlaySource {
61 Immediate {
63 trie: Arc<TrieUpdatesSorted>,
65 state: Arc<HashedPostStateSorted>,
67 },
68 Lazy(LazyOverlay),
70}
71
72impl OverlaySource {
73 fn resolve(&self) -> (Arc<TrieUpdatesSorted>, Arc<HashedPostStateSorted>) {
77 match self {
78 Self::Immediate { trie, state } => (Arc::clone(trie), Arc::clone(state)),
79 Self::Lazy(lazy) => lazy.as_overlay(),
80 }
81 }
82}
83
84#[derive(Debug, Clone)]
89pub struct OverlayStateProviderFactory<F> {
90 factory: F,
92 block_hash: Option<B256>,
94 overlay_source: Option<OverlaySource>,
96 changeset_cache: ChangesetCache,
98 metrics: OverlayStateProviderMetrics,
100 overlay_cache: Arc<DashMap<BlockNumber, Overlay>>,
103}
104
105impl<F> OverlayStateProviderFactory<F> {
106 pub fn new(factory: F, changeset_cache: ChangesetCache) -> Self {
108 Self {
109 factory,
110 block_hash: None,
111 overlay_source: None,
112 changeset_cache,
113 metrics: OverlayStateProviderMetrics::default(),
114 overlay_cache: Default::default(),
115 }
116 }
117
118 pub const fn with_block_hash(mut self, block_hash: Option<B256>) -> Self {
121 self.block_hash = block_hash;
122 self
123 }
124
125 pub fn with_overlay_source(mut self, source: Option<OverlaySource>) -> Self {
129 self.overlay_source = source;
130 self.overlay_cache = Default::default();
132 self
133 }
134
135 pub fn with_lazy_overlay(mut self, lazy_overlay: Option<LazyOverlay>) -> Self {
139 self.overlay_source = lazy_overlay.map(OverlaySource::Lazy);
140 self.overlay_cache = Default::default();
142 self
143 }
144
145 pub fn with_hashed_state_overlay(
149 mut self,
150 hashed_state_overlay: Option<Arc<HashedPostStateSorted>>,
151 ) -> Self {
152 if let Some(state) = hashed_state_overlay {
153 self.overlay_source = Some(OverlaySource::Immediate {
154 trie: Arc::new(TrieUpdatesSorted::default()),
155 state,
156 });
157 self.overlay_cache = Default::default();
159 }
160 self
161 }
162
163 pub fn with_extended_hashed_state_overlay(mut self, other: HashedPostStateSorted) -> Self {
168 match &mut self.overlay_source {
169 Some(OverlaySource::Immediate { state, .. }) => {
170 Arc::make_mut(state).extend_ref_and_sort(&other);
171 }
172 Some(OverlaySource::Lazy(lazy)) => {
173 let (trie, mut state) = lazy.as_overlay();
175 Arc::make_mut(&mut state).extend_ref_and_sort(&other);
176 self.overlay_source = Some(OverlaySource::Immediate { trie, state });
177 }
178 None => {
179 self.overlay_source = Some(OverlaySource::Immediate {
180 trie: Arc::new(TrieUpdatesSorted::default()),
181 state: Arc::new(other),
182 });
183 }
184 }
185 self.overlay_cache = Default::default();
187 self
188 }
189}
190
191impl<F> OverlayStateProviderFactory<F>
192where
193 F: DatabaseProviderFactory,
194 F::Provider: StageCheckpointReader
195 + PruneCheckpointReader
196 + ChangeSetReader
197 + StorageChangeSetReader
198 + DBProvider
199 + BlockNumReader
200 + StorageSettingsCache,
201{
202 fn resolve_overlays(&self) -> (Arc<TrieUpdatesSorted>, Arc<HashedPostStateSorted>) {
207 match &self.overlay_source {
208 Some(source) => source.resolve(),
209 None => {
210 (Arc::new(TrieUpdatesSorted::default()), Arc::new(HashedPostStateSorted::default()))
211 }
212 }
213 }
214
215 fn get_requested_block_number(
217 &self,
218 provider: &F::Provider,
219 ) -> ProviderResult<Option<BlockNumber>> {
220 if let Some(block_hash) = self.block_hash {
221 Ok(Some(
222 provider
223 .convert_hash_or_number(block_hash.into())?
224 .ok_or_else(|| ProviderError::BlockHashNotFound(block_hash))?,
225 ))
226 } else {
227 Ok(None)
228 }
229 }
230
231 fn get_db_tip_block_number(&self, provider: &F::Provider) -> ProviderResult<BlockNumber> {
234 provider
235 .get_stage_checkpoint(StageId::Finish)?
236 .as_ref()
237 .map(|chk| chk.block_number)
238 .ok_or_else(|| ProviderError::InsufficientChangesets { requested: 0, available: 0..=0 })
239 }
240
241 fn reverts_required(
247 &self,
248 provider: &F::Provider,
249 db_tip_block: BlockNumber,
250 requested_block: BlockNumber,
251 ) -> ProviderResult<bool> {
252 if db_tip_block == requested_block {
255 return Ok(false)
256 }
257
258 let prune_checkpoint = provider.get_prune_checkpoint(PruneSegment::AccountHistory)?;
262 let lower_bound = prune_checkpoint
263 .and_then(|chk| chk.block_number)
264 .map(|block_number| block_number + 1)
265 .unwrap_or_default();
266
267 let available_range = lower_bound..=db_tip_block;
268
269 if !available_range.contains(&requested_block) {
271 return Err(ProviderError::InsufficientChangesets {
272 requested: requested_block,
273 available: available_range,
274 });
275 }
276
277 Ok(true)
278 }
279
280 #[instrument(
282 level = "debug",
283 target = "providers::state::overlay",
284 skip_all,
285 fields(db_tip_block)
286 )]
287 fn calculate_overlay(
288 &self,
289 provider: &F::Provider,
290 db_tip_block: BlockNumber,
291 ) -> ProviderResult<Overlay> {
292 let retrieve_trie_reverts_duration;
296 let retrieve_hashed_state_reverts_duration;
297 let trie_updates_total_len;
298 let hashed_state_updates_total_len;
299
300 let (trie_updates, hashed_post_state) = if let Some(from_block) =
302 self.get_requested_block_number(provider)? &&
303 self.reverts_required(provider, db_tip_block, from_block)?
304 {
305 debug!(
306 target: "providers::state::overlay",
307 block_hash = ?self.block_hash,
308 from_block,
309 db_tip_block,
310 range_start = from_block + 1,
311 range_end = db_tip_block,
312 "Collecting trie reverts for overlay state provider"
313 );
314
315 let mut trie_reverts = {
317 let _guard =
318 debug_span!(target: "providers::state::overlay", "Retrieving trie reverts")
319 .entered();
320
321 let start = Instant::now();
322
323 let accumulated_reverts = self
326 .changeset_cache
327 .get_or_compute_range(provider, (from_block + 1)..=db_tip_block)?;
328
329 retrieve_trie_reverts_duration = start.elapsed();
330 accumulated_reverts
331 };
332
333 let mut hashed_state_reverts = {
335 let _guard = debug_span!(target: "providers::state::overlay", "Retrieving hashed state reverts").entered();
336
337 let start = Instant::now();
338 let res = reth_trie_db::from_reverts_auto(provider, from_block + 1..)?;
339 retrieve_hashed_state_reverts_duration = start.elapsed();
340 res
341 };
342
343 let (overlay_trie, overlay_state) = self.resolve_overlays();
346
347 let trie_updates = if trie_reverts.is_empty() {
348 overlay_trie
349 } else if !overlay_trie.is_empty() {
350 trie_reverts.extend_ref_and_sort(&overlay_trie);
351 Arc::new(trie_reverts)
352 } else {
353 Arc::new(trie_reverts)
354 };
355
356 let hashed_state_updates = if hashed_state_reverts.is_empty() {
357 overlay_state
358 } else if !overlay_state.is_empty() {
359 hashed_state_reverts.extend_ref_and_sort(&overlay_state);
360 Arc::new(hashed_state_reverts)
361 } else {
362 Arc::new(hashed_state_reverts)
363 };
364
365 trie_updates_total_len = trie_updates.total_len();
366 hashed_state_updates_total_len = hashed_state_updates.total_len();
367
368 debug!(
369 target: "providers::state::overlay",
370 block_hash = ?self.block_hash,
371 ?from_block,
372 num_trie_updates = ?trie_updates_total_len,
373 num_state_updates = ?hashed_state_updates_total_len,
374 "Reverted to target block",
375 );
376
377 (trie_updates, hashed_state_updates)
378 } else {
379 let (trie_updates, hashed_state) = self.resolve_overlays();
381
382 retrieve_trie_reverts_duration = Duration::ZERO;
383 retrieve_hashed_state_reverts_duration = Duration::ZERO;
384 trie_updates_total_len = trie_updates.total_len();
385 hashed_state_updates_total_len = hashed_state.total_len();
386
387 (trie_updates, hashed_state)
388 };
389
390 self.metrics
392 .retrieve_trie_reverts_duration
393 .record(retrieve_trie_reverts_duration.as_secs_f64());
394 self.metrics
395 .retrieve_hashed_state_reverts_duration
396 .record(retrieve_hashed_state_reverts_duration.as_secs_f64());
397 self.metrics.trie_updates_size.record(trie_updates_total_len as f64);
398 self.metrics.hashed_state_size.record(hashed_state_updates_total_len as f64);
399
400 Ok(Overlay { trie_updates, hashed_post_state })
401 }
402
403 #[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
406 fn get_overlay(&self, provider: &F::Provider) -> ProviderResult<Overlay> {
407 if self.block_hash.is_none() {
410 let (trie_updates, hashed_post_state) = self.resolve_overlays();
411 return Ok(Overlay { trie_updates, hashed_post_state })
412 }
413
414 let db_tip_block = self.get_db_tip_block_number(provider)?;
415
416 if let Some(entry) = self.overlay_cache.get(&db_tip_block) {
418 return Ok(entry.value().clone());
419 }
420
421 let mut cache_miss = false;
424 let overlay = match self.overlay_cache.entry(db_tip_block) {
425 dashmap::Entry::Occupied(entry) => entry.get().clone(),
426 dashmap::Entry::Vacant(entry) => {
427 cache_miss = true;
428 let overlay = self.calculate_overlay(provider, db_tip_block)?;
429 entry.insert(overlay.clone());
430 overlay
431 }
432 };
433
434 if cache_miss {
435 self.metrics.overlay_cache_misses.increment(1);
436 }
437
438 Ok(overlay)
439 }
440}
441
442impl<F> DatabaseProviderROFactory for OverlayStateProviderFactory<F>
443where
444 F: DatabaseProviderFactory,
445 F::Provider: StageCheckpointReader
446 + PruneCheckpointReader
447 + BlockNumReader
448 + ChangeSetReader
449 + StorageChangeSetReader
450 + StorageSettingsCache,
451{
452 type Provider = OverlayStateProvider<F::Provider>;
453
454 #[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
456 fn database_provider_ro(&self) -> ProviderResult<OverlayStateProvider<F::Provider>> {
457 let overall_start = Instant::now();
458
459 let provider = {
461 let _guard =
462 debug_span!(target: "providers::state::overlay", "Creating db provider").entered();
463
464 let start = Instant::now();
465 let res = self.factory.database_provider_ro()?;
466 self.metrics.create_provider_duration.record(start.elapsed());
467 res
468 };
469
470 let Overlay { trie_updates, hashed_post_state } = self.get_overlay(&provider)?;
471
472 self.metrics.database_provider_ro_duration.record(overall_start.elapsed());
473 Ok(OverlayStateProvider::new(provider, trie_updates, hashed_post_state))
474 }
475}
476
477#[derive(Debug)]
483pub struct OverlayStateProvider<Provider: DBProvider> {
484 provider: Provider,
485 trie_updates: Arc<TrieUpdatesSorted>,
486 hashed_post_state: Arc<HashedPostStateSorted>,
487}
488
489impl<Provider> OverlayStateProvider<Provider>
490where
491 Provider: DBProvider,
492{
493 pub const fn new(
496 provider: Provider,
497 trie_updates: Arc<TrieUpdatesSorted>,
498 hashed_post_state: Arc<HashedPostStateSorted>,
499 ) -> Self {
500 Self { provider, trie_updates, hashed_post_state }
501 }
502}
503
504impl<Provider> TrieCursorFactory for OverlayStateProvider<Provider>
505where
506 Provider: DBProvider,
507{
508 type AccountTrieCursor<'a>
509 = <InMemoryTrieCursorFactory<
510 DatabaseTrieCursorFactory<&'a Provider::Tx>,
511 &'a TrieUpdatesSorted,
512 > as TrieCursorFactory>::AccountTrieCursor<'a>
513 where
514 Self: 'a;
515
516 type StorageTrieCursor<'a>
517 = <InMemoryTrieCursorFactory<
518 DatabaseTrieCursorFactory<&'a Provider::Tx>,
519 &'a TrieUpdatesSorted,
520 > as TrieCursorFactory>::StorageTrieCursor<'a>
521 where
522 Self: 'a;
523
524 fn account_trie_cursor(&self) -> Result<Self::AccountTrieCursor<'_>, DatabaseError> {
525 let db_trie_cursor_factory = DatabaseTrieCursorFactory::new(self.provider.tx_ref());
526 let trie_cursor_factory =
527 InMemoryTrieCursorFactory::new(db_trie_cursor_factory, self.trie_updates.as_ref());
528 trie_cursor_factory.account_trie_cursor()
529 }
530
531 fn storage_trie_cursor(
532 &self,
533 hashed_address: B256,
534 ) -> Result<Self::StorageTrieCursor<'_>, DatabaseError> {
535 let db_trie_cursor_factory = DatabaseTrieCursorFactory::new(self.provider.tx_ref());
536 let trie_cursor_factory =
537 InMemoryTrieCursorFactory::new(db_trie_cursor_factory, self.trie_updates.as_ref());
538 trie_cursor_factory.storage_trie_cursor(hashed_address)
539 }
540}
541
542impl<Provider> HashedCursorFactory for OverlayStateProvider<Provider>
543where
544 Provider: DBProvider,
545{
546 type AccountCursor<'a>
547 = <HashedPostStateCursorFactory<
548 DatabaseHashedCursorFactory<&'a Provider::Tx>,
549 &'a Arc<HashedPostStateSorted>,
550 > as HashedCursorFactory>::AccountCursor<'a>
551 where
552 Self: 'a;
553
554 type StorageCursor<'a>
555 = <HashedPostStateCursorFactory<
556 DatabaseHashedCursorFactory<&'a Provider::Tx>,
557 &'a Arc<HashedPostStateSorted>,
558 > as HashedCursorFactory>::StorageCursor<'a>
559 where
560 Self: 'a;
561
562 fn hashed_account_cursor(&self) -> Result<Self::AccountCursor<'_>, DatabaseError> {
563 let db_hashed_cursor_factory = DatabaseHashedCursorFactory::new(self.provider.tx_ref());
564 let hashed_cursor_factory =
565 HashedPostStateCursorFactory::new(db_hashed_cursor_factory, &self.hashed_post_state);
566 hashed_cursor_factory.hashed_account_cursor()
567 }
568
569 fn hashed_storage_cursor(
570 &self,
571 hashed_address: B256,
572 ) -> Result<Self::StorageCursor<'_>, DatabaseError> {
573 let db_hashed_cursor_factory = DatabaseHashedCursorFactory::new(self.provider.tx_ref());
574 let hashed_cursor_factory =
575 HashedPostStateCursorFactory::new(db_hashed_cursor_factory, &self.hashed_post_state);
576 hashed_cursor_factory.hashed_storage_cursor(hashed_address)
577 }
578}