reth_ress_provider/
pending_state.rs

1use alloy_consensus::BlockHeader as _;
2use alloy_primitives::{
3    map::{B256HashSet, B256Map},
4    BlockNumber, B256,
5};
6use futures::StreamExt;
7use parking_lot::RwLock;
8use reth_chain_state::ExecutedBlockWithTrieUpdates;
9use reth_ethereum_primitives::EthPrimitives;
10use reth_node_api::{BeaconConsensusEngineEvent, NodePrimitives};
11use reth_primitives_traits::{Bytecode, RecoveredBlock};
12use reth_provider::BlockNumReader;
13use reth_tokio_util::EventStream;
14use std::{collections::BTreeMap, sync::Arc};
15use tracing::*;
16
17/// Pending state for [`crate::RethRessProtocolProvider`].
18#[derive(Clone, Default, Debug)]
19pub struct PendingState<N: NodePrimitives>(Arc<RwLock<PendingStateInner<N>>>);
20
21#[derive(Default, Debug)]
22struct PendingStateInner<N: NodePrimitives> {
23    blocks_by_hash: B256Map<ExecutedBlockWithTrieUpdates<N>>,
24    invalid_blocks_by_hash: B256Map<Arc<RecoveredBlock<N::Block>>>,
25    block_hashes_by_number: BTreeMap<BlockNumber, B256HashSet>,
26}
27
28impl<N: NodePrimitives> PendingState<N> {
29    /// Insert executed block with trie updates.
30    pub fn insert_block(&self, block: ExecutedBlockWithTrieUpdates<N>) {
31        let mut this = self.0.write();
32        let block_hash = block.recovered_block.hash();
33        this.block_hashes_by_number
34            .entry(block.recovered_block.number())
35            .or_default()
36            .insert(block_hash);
37        this.blocks_by_hash.insert(block_hash, block);
38    }
39
40    /// Insert invalid block.
41    pub fn insert_invalid_block(&self, block: Arc<RecoveredBlock<N::Block>>) {
42        let mut this = self.0.write();
43        let block_hash = block.hash();
44        this.block_hashes_by_number.entry(block.number()).or_default().insert(block_hash);
45        this.invalid_blocks_by_hash.insert(block_hash, block);
46    }
47
48    /// Returns only valid executed blocks by hash.
49    pub fn executed_block(&self, hash: &B256) -> Option<ExecutedBlockWithTrieUpdates<N>> {
50        self.0.read().blocks_by_hash.get(hash).cloned()
51    }
52
53    /// Returns valid recovered block.
54    pub fn recovered_block(&self, hash: &B256) -> Option<Arc<RecoveredBlock<N::Block>>> {
55        self.executed_block(hash).map(|b| b.recovered_block.clone())
56    }
57
58    /// Returns invalid recovered block.
59    pub fn invalid_recovered_block(&self, hash: &B256) -> Option<Arc<RecoveredBlock<N::Block>>> {
60        self.0.read().invalid_blocks_by_hash.get(hash).cloned()
61    }
62
63    /// Find bytecode in executed blocks state.
64    pub fn find_bytecode(&self, code_hash: B256) -> Option<Bytecode> {
65        let this = self.0.read();
66        for block in this.blocks_by_hash.values() {
67            if let Some(contract) = block.execution_output.bytecode(&code_hash) {
68                return Some(contract);
69            }
70        }
71        None
72    }
73
74    /// Remove all blocks before the specified block number.
75    pub fn remove_before(&self, block_number: BlockNumber) -> u64 {
76        let mut removed = 0;
77        let mut this = self.0.write();
78        while this
79            .block_hashes_by_number
80            .first_key_value()
81            .is_some_and(|(number, _)| number <= &block_number)
82        {
83            let (_, block_hashes) = this.block_hashes_by_number.pop_first().unwrap();
84            for block_hash in block_hashes {
85                removed += 1;
86                this.blocks_by_hash.remove(&block_hash);
87                this.invalid_blocks_by_hash.remove(&block_hash);
88            }
89        }
90        removed
91    }
92}
93
94/// A task to maintain pending state based on consensus engine events.
95pub async fn maintain_pending_state<P>(
96    mut events: EventStream<BeaconConsensusEngineEvent<EthPrimitives>>,
97    provider: P,
98    pending_state: PendingState<EthPrimitives>,
99) where
100    P: BlockNumReader,
101{
102    while let Some(event) = events.next().await {
103        match event {
104            BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _) |
105            BeaconConsensusEngineEvent::ForkBlockAdded(block, _) => {
106                trace!(target: "reth::ress_provider", block = ? block.recovered_block().num_hash(), "Insert block into pending state");
107                pending_state.insert_block(block);
108            }
109            BeaconConsensusEngineEvent::InvalidBlock(block) => {
110                if let Ok(block) = block.try_recover() {
111                    trace!(target: "reth::ress_provider", block = ?block.num_hash(), "Insert invalid block into pending state");
112                    pending_state.insert_invalid_block(Arc::new(block));
113                }
114            }
115            BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status) => {
116                if status.is_valid() {
117                    let target = state.finalized_block_hash;
118                    if let Ok(Some(block_number)) = provider.block_number(target) {
119                        let count = pending_state.remove_before(block_number);
120                        trace!(target: "reth::ress_provider", block_number, count, "Removing blocks before finalized");
121                    }
122                }
123            }
124            // ignore
125            BeaconConsensusEngineEvent::CanonicalChainCommitted(_, _) |
126            BeaconConsensusEngineEvent::LiveSyncProgress(_) => (),
127        }
128    }
129}