reth_ress_provider/
pending_state.rs
1use alloy_consensus::BlockHeader as _;
2use alloy_primitives::{
3 map::{B256HashSet, B256Map},
4 BlockNumber, B256,
5};
6use futures::StreamExt;
7use parking_lot::RwLock;
8use reth_chain_state::ExecutedBlockWithTrieUpdates;
9use reth_ethereum_primitives::EthPrimitives;
10use reth_node_api::{BeaconConsensusEngineEvent, NodePrimitives};
11use reth_primitives_traits::{Bytecode, RecoveredBlock};
12use reth_provider::BlockNumReader;
13use reth_tokio_util::EventStream;
14use std::{collections::BTreeMap, sync::Arc};
15use tracing::*;
16
17#[derive(Clone, Default, Debug)]
19pub struct PendingState<N: NodePrimitives>(Arc<RwLock<PendingStateInner<N>>>);
20
21#[derive(Default, Debug)]
22struct PendingStateInner<N: NodePrimitives> {
23 blocks_by_hash: B256Map<ExecutedBlockWithTrieUpdates<N>>,
24 invalid_blocks_by_hash: B256Map<Arc<RecoveredBlock<N::Block>>>,
25 block_hashes_by_number: BTreeMap<BlockNumber, B256HashSet>,
26}
27
28impl<N: NodePrimitives> PendingState<N> {
29 pub fn insert_block(&self, block: ExecutedBlockWithTrieUpdates<N>) {
31 let mut this = self.0.write();
32 let block_hash = block.recovered_block.hash();
33 this.block_hashes_by_number
34 .entry(block.recovered_block.number())
35 .or_default()
36 .insert(block_hash);
37 this.blocks_by_hash.insert(block_hash, block);
38 }
39
40 pub fn insert_invalid_block(&self, block: Arc<RecoveredBlock<N::Block>>) {
42 let mut this = self.0.write();
43 let block_hash = block.hash();
44 this.block_hashes_by_number.entry(block.number()).or_default().insert(block_hash);
45 this.invalid_blocks_by_hash.insert(block_hash, block);
46 }
47
48 pub fn executed_block(&self, hash: &B256) -> Option<ExecutedBlockWithTrieUpdates<N>> {
50 self.0.read().blocks_by_hash.get(hash).cloned()
51 }
52
53 pub fn recovered_block(&self, hash: &B256) -> Option<Arc<RecoveredBlock<N::Block>>> {
55 self.executed_block(hash).map(|b| b.recovered_block.clone())
56 }
57
58 pub fn invalid_recovered_block(&self, hash: &B256) -> Option<Arc<RecoveredBlock<N::Block>>> {
60 self.0.read().invalid_blocks_by_hash.get(hash).cloned()
61 }
62
63 pub fn find_bytecode(&self, code_hash: B256) -> Option<Bytecode> {
65 let this = self.0.read();
66 for block in this.blocks_by_hash.values() {
67 if let Some(contract) = block.execution_output.bytecode(&code_hash) {
68 return Some(contract);
69 }
70 }
71 None
72 }
73
74 pub fn remove_before(&self, block_number: BlockNumber) -> u64 {
76 let mut removed = 0;
77 let mut this = self.0.write();
78 while this
79 .block_hashes_by_number
80 .first_key_value()
81 .is_some_and(|(number, _)| number <= &block_number)
82 {
83 let (_, block_hashes) = this.block_hashes_by_number.pop_first().unwrap();
84 for block_hash in block_hashes {
85 removed += 1;
86 this.blocks_by_hash.remove(&block_hash);
87 this.invalid_blocks_by_hash.remove(&block_hash);
88 }
89 }
90 removed
91 }
92}
93
94pub async fn maintain_pending_state<P>(
96 mut events: EventStream<BeaconConsensusEngineEvent<EthPrimitives>>,
97 provider: P,
98 pending_state: PendingState<EthPrimitives>,
99) where
100 P: BlockNumReader,
101{
102 while let Some(event) = events.next().await {
103 match event {
104 BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _) |
105 BeaconConsensusEngineEvent::ForkBlockAdded(block, _) => {
106 trace!(target: "reth::ress_provider", block = ? block.recovered_block().num_hash(), "Insert block into pending state");
107 pending_state.insert_block(block);
108 }
109 BeaconConsensusEngineEvent::InvalidBlock(block) => {
110 if let Ok(block) = block.try_recover() {
111 trace!(target: "reth::ress_provider", block = ?block.num_hash(), "Insert invalid block into pending state");
112 pending_state.insert_invalid_block(Arc::new(block));
113 }
114 }
115 BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status) => {
116 if status.is_valid() {
117 let target = state.finalized_block_hash;
118 if let Ok(Some(block_number)) = provider.block_number(target) {
119 let count = pending_state.remove_before(block_number);
120 trace!(target: "reth::ress_provider", block_number, count, "Removing blocks before finalized");
121 }
122 }
123 }
124 BeaconConsensusEngineEvent::CanonicalChainCommitted(_, _) |
126 BeaconConsensusEngineEvent::LiveSyncProgress(_) => (),
127 }
128 }
129}