reth_e2e_test_utils/
node.rs

1use crate::{network::NetworkTestContext, payload::PayloadTestContext, rpc::RpcTestContext};
2use alloy_consensus::{transaction::TxHashRef, BlockHeader};
3use alloy_eips::BlockId;
4use alloy_primitives::{BlockHash, BlockNumber, Bytes, Sealable, B256};
5use alloy_rpc_types_engine::ForkchoiceState;
6use alloy_rpc_types_eth::BlockNumberOrTag;
7use eyre::Ok;
8use futures_util::Future;
9use jsonrpsee::http_client::HttpClient;
10use reth_chainspec::EthereumHardforks;
11use reth_network_api::test_utils::PeersHandleProvider;
12use reth_node_api::{
13    Block, BlockBody, BlockTy, EngineApiMessageVersion, FullNodeComponents, PayloadTypes,
14    PrimitivesTy,
15};
16use reth_node_builder::{rpc::RethRpcAddOns, FullNode, NodeTypes};
17
18use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes};
19use reth_provider::{
20    BlockReader, BlockReaderIdExt, CanonStateNotificationStream, CanonStateSubscriptions,
21    HeaderProvider, StageCheckpointReader,
22};
23use reth_rpc_builder::auth::AuthServerHandle;
24use reth_rpc_eth_api::helpers::{EthApiSpec, EthTransactions, TraceExt};
25use reth_stages_types::StageId;
26use std::pin::Pin;
27use tokio_stream::StreamExt;
28use url::Url;
29
30/// A helper struct to handle node actions
31#[expect(missing_debug_implementations)]
32pub struct NodeTestContext<Node, AddOns>
33where
34    Node: FullNodeComponents,
35    AddOns: RethRpcAddOns<Node>,
36{
37    /// The core structure representing the full node.
38    pub inner: FullNode<Node, AddOns>,
39    /// Context for testing payload-related features.
40    pub payload: PayloadTestContext<<Node::Types as NodeTypes>::Payload>,
41    /// Context for testing network functionalities.
42    pub network: NetworkTestContext<Node::Network>,
43    /// Context for testing RPC features.
44    pub rpc: RpcTestContext<Node, AddOns::EthApi>,
45    /// Canonical state events.
46    pub canonical_stream: CanonStateNotificationStream<PrimitivesTy<Node::Types>>,
47}
48
49impl<Node, Payload, AddOns> NodeTestContext<Node, AddOns>
50where
51    Payload: PayloadTypes,
52    Node: FullNodeComponents,
53    Node::Types: NodeTypes<ChainSpec: EthereumHardforks, Payload = Payload>,
54    Node::Network: PeersHandleProvider,
55    AddOns: RethRpcAddOns<Node>,
56{
57    /// Creates a new test node
58    pub async fn new(
59        node: FullNode<Node, AddOns>,
60        attributes_generator: impl Fn(u64) -> Payload::PayloadBuilderAttributes + Send + Sync + 'static,
61    ) -> eyre::Result<Self> {
62        Ok(Self {
63            inner: node.clone(),
64            payload: PayloadTestContext::new(
65                node.payload_builder_handle.clone(),
66                attributes_generator,
67            )
68            .await?,
69            network: NetworkTestContext::new(node.network.clone()),
70            rpc: RpcTestContext { inner: node.add_ons_handle.rpc_registry },
71            canonical_stream: node.provider.canonical_state_stream(),
72        })
73    }
74
75    /// Establish a connection to the node
76    pub async fn connect(&mut self, node: &mut Self) {
77        self.network.add_peer(node.network.record()).await;
78        node.network.next_session_established().await;
79        self.network.next_session_established().await;
80    }
81
82    /// Advances the chain `length` blocks.
83    ///
84    /// Returns the added chain as a Vec of block hashes.
85    pub async fn advance(
86        &mut self,
87        length: u64,
88        tx_generator: impl Fn(u64) -> Pin<Box<dyn Future<Output = Bytes>>>,
89    ) -> eyre::Result<Vec<Payload::BuiltPayload>>
90    where
91        AddOns::EthApi: EthApiSpec<Provider: BlockReader<Block = BlockTy<Node::Types>>>
92            + EthTransactions
93            + TraceExt,
94    {
95        let mut chain = Vec::with_capacity(length as usize);
96        for i in 0..length {
97            let raw_tx = tx_generator(i).await;
98            let tx_hash = self.rpc.inject_tx(raw_tx).await?;
99            let payload = self.advance_block().await?;
100            let block_hash = payload.block().hash();
101            let block_number = payload.block().number();
102            self.assert_new_block(tx_hash, block_hash, block_number).await?;
103            chain.push(payload);
104        }
105        Ok(chain)
106    }
107
108    /// Creates a new payload from given attributes generator
109    /// expects a payload attribute event and waits until the payload is built.
110    ///
111    /// It triggers the resolve payload via engine api and expects the built payload event.
112    pub async fn new_payload(&mut self) -> eyre::Result<Payload::BuiltPayload> {
113        // trigger new payload building draining the pool
114        let eth_attr = self.payload.new_payload().await.unwrap();
115        // first event is the payload attributes
116        self.payload.expect_attr_event(eth_attr.clone()).await?;
117        // wait for the payload builder to have finished building
118        self.payload.wait_for_built_payload(eth_attr.payload_id()).await;
119        // ensure we're also receiving the built payload as event
120        Ok(self.payload.expect_built_payload().await?)
121    }
122
123    /// Triggers payload building job and submits it to the engine.
124    pub async fn build_and_submit_payload(&mut self) -> eyre::Result<Payload::BuiltPayload> {
125        let payload = self.new_payload().await?;
126
127        self.submit_payload(payload.clone()).await?;
128
129        Ok(payload)
130    }
131
132    /// Advances the node forward one block
133    pub async fn advance_block(&mut self) -> eyre::Result<Payload::BuiltPayload> {
134        let payload = self.build_and_submit_payload().await?;
135
136        // trigger forkchoice update via engine api to commit the block to the blockchain
137        self.update_forkchoice(payload.block().hash(), payload.block().hash()).await?;
138
139        Ok(payload)
140    }
141
142    /// Waits for block to be available on node.
143    pub async fn wait_block(
144        &self,
145        number: BlockNumber,
146        expected_block_hash: BlockHash,
147        wait_finish_checkpoint: bool,
148    ) -> eyre::Result<()> {
149        let mut check = !wait_finish_checkpoint;
150        loop {
151            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
152
153            if !check &&
154                wait_finish_checkpoint &&
155                let Some(checkpoint) =
156                    self.inner.provider.get_stage_checkpoint(StageId::Finish)? &&
157                checkpoint.block_number >= number
158            {
159                check = true
160            }
161
162            if check {
163                if let Some(latest_header) = self.inner.provider.header_by_number(number)? {
164                    assert_eq!(latest_header.hash_slow(), expected_block_hash);
165                    break
166                }
167                assert!(
168                    !wait_finish_checkpoint,
169                    "Finish checkpoint matches, but could not fetch block."
170                );
171            }
172        }
173        Ok(())
174    }
175
176    /// Waits for the node to unwind to the given block number
177    pub async fn wait_unwind(&self, number: BlockNumber) -> eyre::Result<()> {
178        loop {
179            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
180            if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Headers)? &&
181                checkpoint.block_number == number
182            {
183                break
184            }
185        }
186        Ok(())
187    }
188
189    /// Asserts that a new block has been added to the blockchain
190    /// and the tx has been included in the block.
191    ///
192    /// Does NOT work for pipeline since there's no stream notification!
193    pub async fn assert_new_block(
194        &mut self,
195        tip_tx_hash: B256,
196        block_hash: B256,
197        block_number: BlockNumber,
198    ) -> eyre::Result<()> {
199        // get head block from notifications stream and verify the tx has been pushed to the
200        // pool is actually present in the canonical block
201        let head = self.canonical_stream.next().await.unwrap();
202        let tx = head.tip().body().transactions().first();
203        assert_eq!(tx.unwrap().tx_hash().as_slice(), tip_tx_hash.as_slice());
204
205        loop {
206            // wait for the block to commit
207            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
208            if let Some(latest_block) =
209                self.inner.provider.block_by_number_or_tag(BlockNumberOrTag::Latest)? &&
210                latest_block.header().number() == block_number
211            {
212                // make sure the block hash we submitted via FCU engine api is the new latest
213                // block using an RPC call
214                assert_eq!(latest_block.header().hash_slow(), block_hash);
215                break
216            }
217        }
218        Ok(())
219    }
220
221    /// Gets block hash by number.
222    pub fn block_hash(&self, number: u64) -> BlockHash {
223        self.inner
224            .provider
225            .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(number))
226            .unwrap()
227            .unwrap()
228            .hash()
229    }
230
231    /// Sends FCU and waits for the node to sync to the given block.
232    pub async fn sync_to(&self, block: BlockHash) -> eyre::Result<()> {
233        let start = std::time::Instant::now();
234
235        while self
236            .inner
237            .provider
238            .sealed_header_by_id(BlockId::Number(BlockNumberOrTag::Latest))?
239            .is_none_or(|h| h.hash() != block)
240        {
241            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
242            self.update_forkchoice(block, block).await?;
243
244            assert!(start.elapsed() <= std::time::Duration::from_secs(40), "timed out");
245        }
246
247        // Hack to make sure that all components have time to process canonical state update.
248        // Otherwise, this might result in e.g "nonce too low" errors when advancing chain further,
249        // making tests flaky.
250        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
251
252        Ok(())
253    }
254
255    /// Sends a forkchoice update message to the engine.
256    pub async fn update_forkchoice(&self, current_head: B256, new_head: B256) -> eyre::Result<()> {
257        self.inner
258            .add_ons_handle
259            .beacon_engine_handle
260            .fork_choice_updated(
261                ForkchoiceState {
262                    head_block_hash: new_head,
263                    safe_block_hash: current_head,
264                    finalized_block_hash: current_head,
265                },
266                None,
267                EngineApiMessageVersion::default(),
268            )
269            .await?;
270
271        Ok(())
272    }
273
274    /// Sends forkchoice update to the engine api with a zero finalized hash
275    pub async fn update_optimistic_forkchoice(&self, hash: B256) -> eyre::Result<()> {
276        self.update_forkchoice(B256::ZERO, hash).await
277    }
278
279    /// Submits a payload to the engine.
280    pub async fn submit_payload(&self, payload: Payload::BuiltPayload) -> eyre::Result<B256> {
281        let block_hash = payload.block().hash();
282        self.inner
283            .add_ons_handle
284            .beacon_engine_handle
285            .new_payload(Payload::block_to_payload(payload.block().clone()))
286            .await?;
287
288        Ok(block_hash)
289    }
290
291    /// Returns the RPC URL.
292    pub fn rpc_url(&self) -> Url {
293        let addr = self.inner.rpc_server_handle().http_local_addr().unwrap();
294        format!("http://{addr}").parse().unwrap()
295    }
296
297    /// Returns an RPC client.
298    pub fn rpc_client(&self) -> Option<HttpClient> {
299        self.inner.rpc_server_handle().http_client()
300    }
301
302    /// Returns an Engine API client.
303    pub fn auth_server_handle(&self) -> AuthServerHandle {
304        self.inner.auth_server_handle().clone()
305    }
306
307    /// Creates a [`crate::testsuite::NodeClient`] from this test context.
308    ///
309    /// This helper method extracts the necessary handles and creates a client
310    /// that can interact with both the regular RPC and Engine API endpoints.
311    /// It automatically includes the beacon engine handle for direct consensus engine interaction.
312    pub fn to_node_client(&self) -> eyre::Result<crate::testsuite::NodeClient<Payload>> {
313        let rpc = self
314            .rpc_client()
315            .ok_or_else(|| eyre::eyre!("Failed to create HTTP RPC client for node"))?;
316        let auth = self.auth_server_handle();
317        let url = self.rpc_url();
318        let beacon_handle = self.inner.add_ons_handle.beacon_engine_handle.clone();
319
320        Ok(crate::testsuite::NodeClient::new_with_beacon_engine(rpc, auth, url, beacon_handle))
321    }
322}