reth_provider/providers/state/
overlay.rs1use alloy_primitives::{BlockNumber, B256};
2use metrics::{Counter, Histogram};
3use parking_lot::RwLock;
4use reth_db_api::DatabaseError;
5use reth_errors::{ProviderError, ProviderResult};
6use reth_metrics::Metrics;
7use reth_prune_types::PruneSegment;
8use reth_stages_types::StageId;
9use reth_storage_api::{
10 BlockNumReader, DBProvider, DatabaseProviderFactory, DatabaseProviderROFactory,
11 PruneCheckpointReader, StageCheckpointReader, TrieReader,
12};
13use reth_trie::{
14 hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
15 trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
16 updates::TrieUpdatesSorted,
17 HashedPostStateSorted, KeccakKeyHasher,
18};
19use reth_trie_db::{
20 DatabaseHashedCursorFactory, DatabaseHashedPostState, DatabaseTrieCursorFactory,
21};
22use std::{
23 collections::{hash_map::Entry, HashMap},
24 sync::Arc,
25 time::{Duration, Instant},
26};
27use tracing::{debug, debug_span, instrument};
28
29#[derive(Clone, Metrics)]
31#[metrics(scope = "storage.providers.overlay")]
32pub(crate) struct OverlayStateProviderMetrics {
33 create_provider_duration: Histogram,
35 retrieve_trie_reverts_duration: Histogram,
37 retrieve_hashed_state_reverts_duration: Histogram,
39 trie_updates_size: Histogram,
41 hashed_state_size: Histogram,
43 database_provider_ro_duration: Histogram,
45 overlay_cache_misses: Counter,
47}
48
49#[derive(Debug, Clone)]
51struct Overlay {
52 trie_updates: Arc<TrieUpdatesSorted>,
53 hashed_post_state: Arc<HashedPostStateSorted>,
54}
55
56#[derive(Debug, Clone)]
61pub struct OverlayStateProviderFactory<F> {
62 factory: F,
64 block_hash: Option<B256>,
66 trie_overlay: Option<Arc<TrieUpdatesSorted>>,
68 hashed_state_overlay: Option<Arc<HashedPostStateSorted>>,
70 metrics: OverlayStateProviderMetrics,
72 overlay_cache: Arc<RwLock<HashMap<BlockNumber, Overlay>>>,
75}
76
77impl<F> OverlayStateProviderFactory<F> {
78 pub fn new(factory: F) -> Self {
80 Self {
81 factory,
82 block_hash: None,
83 trie_overlay: None,
84 hashed_state_overlay: None,
85 metrics: OverlayStateProviderMetrics::default(),
86 overlay_cache: Default::default(),
87 }
88 }
89
90 pub const fn with_block_hash(mut self, block_hash: Option<B256>) -> Self {
93 self.block_hash = block_hash;
94 self
95 }
96
97 pub fn with_trie_overlay(mut self, trie_overlay: Option<Arc<TrieUpdatesSorted>>) -> Self {
101 self.trie_overlay = trie_overlay;
102 self
103 }
104
105 pub fn with_hashed_state_overlay(
109 mut self,
110 hashed_state_overlay: Option<Arc<HashedPostStateSorted>>,
111 ) -> Self {
112 self.hashed_state_overlay = hashed_state_overlay;
113 self
114 }
115}
116
117impl<F> OverlayStateProviderFactory<F>
118where
119 F: DatabaseProviderFactory,
120 F::Provider: TrieReader + StageCheckpointReader + PruneCheckpointReader + BlockNumReader,
121{
122 fn get_requested_block_number(
124 &self,
125 provider: &F::Provider,
126 ) -> ProviderResult<Option<BlockNumber>> {
127 if let Some(block_hash) = self.block_hash {
128 Ok(Some(
129 provider
130 .convert_hash_or_number(block_hash.into())?
131 .ok_or_else(|| ProviderError::BlockHashNotFound(block_hash))?,
132 ))
133 } else {
134 Ok(None)
135 }
136 }
137
138 fn get_db_tip_block_number(&self, provider: &F::Provider) -> ProviderResult<BlockNumber> {
141 provider
142 .get_stage_checkpoint(StageId::MerkleChangeSets)?
143 .as_ref()
144 .map(|chk| chk.block_number)
145 .ok_or_else(|| ProviderError::InsufficientChangesets { requested: 0, available: 0..=0 })
146 }
147
148 fn reverts_required(
155 &self,
156 provider: &F::Provider,
157 db_tip_block: BlockNumber,
158 requested_block: BlockNumber,
159 ) -> ProviderResult<bool> {
160 if db_tip_block == requested_block {
163 return Ok(false)
164 }
165
166 let prune_checkpoint = provider.get_prune_checkpoint(PruneSegment::MerkleChangeSets)?;
169
170 let lower_bound = prune_checkpoint
179 .and_then(|chk| chk.block_number)
180 .map(|block_number| block_number + 1)
181 .unwrap_or_default();
182
183 let available_range = lower_bound..=db_tip_block;
184
185 if !available_range.contains(&requested_block) {
187 return Err(ProviderError::InsufficientChangesets {
188 requested: requested_block,
189 available: available_range,
190 });
191 }
192
193 Ok(true)
194 }
195
196 #[instrument(
198 level = "debug",
199 target = "providers::state::overlay",
200 skip_all,
201 fields(db_tip_block)
202 )]
203 fn calculate_overlay(
204 &self,
205 provider: &F::Provider,
206 db_tip_block: BlockNumber,
207 ) -> ProviderResult<Overlay> {
208 let retrieve_trie_reverts_duration;
211 let retrieve_hashed_state_reverts_duration;
212 let trie_updates_total_len;
213 let hashed_state_updates_total_len;
214
215 let (trie_updates, hashed_post_state) = if let Some(from_block) =
217 self.get_requested_block_number(provider)? &&
218 self.reverts_required(provider, db_tip_block, from_block)?
219 {
220 let mut trie_reverts = {
222 let _guard =
223 debug_span!(target: "providers::state::overlay", "Retrieving trie reverts")
224 .entered();
225
226 let start = Instant::now();
227 let res = provider.trie_reverts(from_block + 1)?;
228 retrieve_trie_reverts_duration = start.elapsed();
229 res
230 };
231
232 let mut hashed_state_reverts = {
234 let _guard = debug_span!(target: "providers::state::overlay", "Retrieving hashed state reverts").entered();
235
236 let start = Instant::now();
237 let res = HashedPostStateSorted::from_reverts::<KeccakKeyHasher>(
238 provider.tx_ref(),
239 from_block + 1..,
240 )?;
241 retrieve_hashed_state_reverts_duration = start.elapsed();
242 res
243 };
244
245 let trie_updates = match self.trie_overlay.as_ref() {
248 Some(trie_overlay) if trie_reverts.is_empty() => Arc::clone(trie_overlay),
249 Some(trie_overlay) => {
250 trie_reverts.extend_ref(trie_overlay);
251 Arc::new(trie_reverts)
252 }
253 None => Arc::new(trie_reverts),
254 };
255
256 let hashed_state_updates = match self.hashed_state_overlay.as_ref() {
257 Some(hashed_state_overlay) if hashed_state_reverts.is_empty() => {
258 Arc::clone(hashed_state_overlay)
259 }
260 Some(hashed_state_overlay) => {
261 hashed_state_reverts.extend_ref(hashed_state_overlay);
262 Arc::new(hashed_state_reverts)
263 }
264 None => Arc::new(hashed_state_reverts),
265 };
266
267 trie_updates_total_len = trie_updates.total_len();
268 hashed_state_updates_total_len = hashed_state_updates.total_len();
269
270 debug!(
271 target: "providers::state::overlay",
272 block_hash = ?self.block_hash,
273 ?from_block,
274 num_trie_updates = ?trie_updates_total_len,
275 num_state_updates = ?hashed_state_updates_total_len,
276 "Reverted to target block",
277 );
278
279 (trie_updates, hashed_state_updates)
280 } else {
281 let trie_updates =
283 self.trie_overlay.clone().unwrap_or_else(|| Arc::new(TrieUpdatesSorted::default()));
284 let hashed_state = self
285 .hashed_state_overlay
286 .clone()
287 .unwrap_or_else(|| Arc::new(HashedPostStateSorted::default()));
288
289 retrieve_trie_reverts_duration = Duration::ZERO;
290 retrieve_hashed_state_reverts_duration = Duration::ZERO;
291 trie_updates_total_len = trie_updates.total_len();
292 hashed_state_updates_total_len = hashed_state.total_len();
293
294 (trie_updates, hashed_state)
295 };
296
297 self.metrics
299 .retrieve_trie_reverts_duration
300 .record(retrieve_trie_reverts_duration.as_secs_f64());
301 self.metrics
302 .retrieve_hashed_state_reverts_duration
303 .record(retrieve_hashed_state_reverts_duration.as_secs_f64());
304 self.metrics.trie_updates_size.record(trie_updates_total_len as f64);
305 self.metrics.hashed_state_size.record(hashed_state_updates_total_len as f64);
306
307 Ok(Overlay { trie_updates, hashed_post_state })
308 }
309
310 #[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
313 fn get_overlay(&self, provider: &F::Provider) -> ProviderResult<Overlay> {
314 if self.block_hash.is_none() {
317 let trie_updates =
318 self.trie_overlay.clone().unwrap_or_else(|| Arc::new(TrieUpdatesSorted::default()));
319 let hashed_post_state = self
320 .hashed_state_overlay
321 .clone()
322 .unwrap_or_else(|| Arc::new(HashedPostStateSorted::default()));
323 return Ok(Overlay { trie_updates, hashed_post_state })
324 }
325
326 let db_tip_block = self.get_db_tip_block_number(provider)?;
327
328 if let Some(overlay) = self.overlay_cache.as_ref().read().get(&db_tip_block) {
330 return Ok(overlay.clone());
331 }
332
333 let mut cache_miss = false;
337 let overlay = match self.overlay_cache.as_ref().write().entry(db_tip_block) {
338 Entry::Occupied(entry) => entry.get().clone(),
339 Entry::Vacant(entry) => {
340 cache_miss = true;
341 let overlay = self.calculate_overlay(provider, db_tip_block)?;
342 entry.insert(overlay.clone());
343 overlay
344 }
345 };
346
347 if cache_miss {
348 self.metrics.overlay_cache_misses.increment(1);
349 }
350
351 Ok(overlay)
352 }
353}
354
355impl<F> DatabaseProviderROFactory for OverlayStateProviderFactory<F>
356where
357 F: DatabaseProviderFactory,
358 F::Provider: TrieReader + StageCheckpointReader + PruneCheckpointReader + BlockNumReader,
359{
360 type Provider = OverlayStateProvider<F::Provider>;
361
362 #[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
364 fn database_provider_ro(&self) -> ProviderResult<OverlayStateProvider<F::Provider>> {
365 let overall_start = Instant::now();
366
367 let provider = {
369 let _guard =
370 debug_span!(target: "providers::state::overlay", "Creating db provider").entered();
371
372 let start = Instant::now();
373 let res = self.factory.database_provider_ro()?;
374 self.metrics.create_provider_duration.record(start.elapsed());
375 res
376 };
377
378 let Overlay { trie_updates, hashed_post_state } = self.get_overlay(&provider)?;
379
380 self.metrics.database_provider_ro_duration.record(overall_start.elapsed());
381 Ok(OverlayStateProvider::new(provider, trie_updates, hashed_post_state))
382 }
383}
384
385#[derive(Debug)]
391pub struct OverlayStateProvider<Provider: DBProvider> {
392 provider: Provider,
393 trie_updates: Arc<TrieUpdatesSorted>,
394 hashed_post_state: Arc<HashedPostStateSorted>,
395}
396
397impl<Provider> OverlayStateProvider<Provider>
398where
399 Provider: DBProvider,
400{
401 pub const fn new(
404 provider: Provider,
405 trie_updates: Arc<TrieUpdatesSorted>,
406 hashed_post_state: Arc<HashedPostStateSorted>,
407 ) -> Self {
408 Self { provider, trie_updates, hashed_post_state }
409 }
410}
411
412impl<Provider> TrieCursorFactory for OverlayStateProvider<Provider>
413where
414 Provider: DBProvider,
415{
416 type AccountTrieCursor<'a>
417 = <InMemoryTrieCursorFactory<
418 DatabaseTrieCursorFactory<&'a Provider::Tx>,
419 &'a TrieUpdatesSorted,
420 > as TrieCursorFactory>::AccountTrieCursor<'a>
421 where
422 Self: 'a;
423
424 type StorageTrieCursor<'a>
425 = <InMemoryTrieCursorFactory<
426 DatabaseTrieCursorFactory<&'a Provider::Tx>,
427 &'a TrieUpdatesSorted,
428 > as TrieCursorFactory>::StorageTrieCursor<'a>
429 where
430 Self: 'a;
431
432 fn account_trie_cursor(&self) -> Result<Self::AccountTrieCursor<'_>, DatabaseError> {
433 let db_trie_cursor_factory = DatabaseTrieCursorFactory::new(self.provider.tx_ref());
434 let trie_cursor_factory =
435 InMemoryTrieCursorFactory::new(db_trie_cursor_factory, self.trie_updates.as_ref());
436 trie_cursor_factory.account_trie_cursor()
437 }
438
439 fn storage_trie_cursor(
440 &self,
441 hashed_address: B256,
442 ) -> Result<Self::StorageTrieCursor<'_>, DatabaseError> {
443 let db_trie_cursor_factory = DatabaseTrieCursorFactory::new(self.provider.tx_ref());
444 let trie_cursor_factory =
445 InMemoryTrieCursorFactory::new(db_trie_cursor_factory, self.trie_updates.as_ref());
446 trie_cursor_factory.storage_trie_cursor(hashed_address)
447 }
448}
449
450impl<Provider> HashedCursorFactory for OverlayStateProvider<Provider>
451where
452 Provider: DBProvider,
453{
454 type AccountCursor<'a>
455 = <HashedPostStateCursorFactory<
456 DatabaseHashedCursorFactory<&'a Provider::Tx>,
457 &'a Arc<HashedPostStateSorted>,
458 > as HashedCursorFactory>::AccountCursor<'a>
459 where
460 Self: 'a;
461
462 type StorageCursor<'a>
463 = <HashedPostStateCursorFactory<
464 DatabaseHashedCursorFactory<&'a Provider::Tx>,
465 &'a Arc<HashedPostStateSorted>,
466 > as HashedCursorFactory>::StorageCursor<'a>
467 where
468 Self: 'a;
469
470 fn hashed_account_cursor(&self) -> Result<Self::AccountCursor<'_>, DatabaseError> {
471 let db_hashed_cursor_factory = DatabaseHashedCursorFactory::new(self.provider.tx_ref());
472 let hashed_cursor_factory =
473 HashedPostStateCursorFactory::new(db_hashed_cursor_factory, &self.hashed_post_state);
474 hashed_cursor_factory.hashed_account_cursor()
475 }
476
477 fn hashed_storage_cursor(
478 &self,
479 hashed_address: B256,
480 ) -> Result<Self::StorageCursor<'_>, DatabaseError> {
481 let db_hashed_cursor_factory = DatabaseHashedCursorFactory::new(self.provider.tx_ref());
482 let hashed_cursor_factory =
483 HashedPostStateCursorFactory::new(db_hashed_cursor_factory, &self.hashed_post_state);
484 hashed_cursor_factory.hashed_storage_cursor(hashed_address)
485 }
486}