reth_e2e_test_utils/
node.rs

1use crate::{network::NetworkTestContext, payload::PayloadTestContext, rpc::RpcTestContext};
2use alloy_consensus::{transaction::TxHashRef, BlockHeader};
3use alloy_eips::BlockId;
4use alloy_primitives::{BlockHash, BlockNumber, Bytes, Sealable, B256};
5use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV5, ForkchoiceState};
6use alloy_rpc_types_eth::BlockNumberOrTag;
7use eyre::Ok;
8use futures_util::Future;
9use jsonrpsee::{core::client::ClientT, http_client::HttpClient};
10use reth_chainspec::EthereumHardforks;
11use reth_network_api::test_utils::PeersHandleProvider;
12use reth_node_api::{
13    Block, BlockBody, BlockTy, EngineApiMessageVersion, FullNodeComponents, PayloadTypes,
14    PrimitivesTy,
15};
16use reth_node_builder::{rpc::RethRpcAddOns, FullNode, NodeTypes};
17
18use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes};
19use reth_provider::{
20    BlockReader, BlockReaderIdExt, CanonStateNotificationStream, CanonStateSubscriptions,
21    HeaderProvider, StageCheckpointReader,
22};
23use reth_rpc_api::TestingBuildBlockRequestV1;
24use reth_rpc_builder::auth::AuthServerHandle;
25use reth_rpc_eth_api::helpers::{EthApiSpec, EthTransactions, TraceExt};
26use reth_stages_types::StageId;
27use std::pin::Pin;
28use tokio_stream::StreamExt;
29use url::Url;
30
31/// A helper struct to handle node actions
32#[expect(missing_debug_implementations)]
33pub struct NodeTestContext<Node, AddOns>
34where
35    Node: FullNodeComponents,
36    AddOns: RethRpcAddOns<Node>,
37{
38    /// The core structure representing the full node.
39    pub inner: FullNode<Node, AddOns>,
40    /// Context for testing payload-related features.
41    pub payload: PayloadTestContext<<Node::Types as NodeTypes>::Payload>,
42    /// Context for testing network functionalities.
43    pub network: NetworkTestContext<Node::Network>,
44    /// Context for testing RPC features.
45    pub rpc: RpcTestContext<Node, AddOns::EthApi>,
46    /// Canonical state events.
47    pub canonical_stream: CanonStateNotificationStream<PrimitivesTy<Node::Types>>,
48}
49
50impl<Node, Payload, AddOns> NodeTestContext<Node, AddOns>
51where
52    Payload: PayloadTypes,
53    Node: FullNodeComponents,
54    Node::Types: NodeTypes<ChainSpec: EthereumHardforks, Payload = Payload>,
55    Node::Network: PeersHandleProvider,
56    AddOns: RethRpcAddOns<Node>,
57{
58    /// Creates a new test node
59    pub async fn new(
60        node: FullNode<Node, AddOns>,
61        attributes_generator: impl Fn(u64) -> Payload::PayloadBuilderAttributes + Send + Sync + 'static,
62    ) -> eyre::Result<Self> {
63        Ok(Self {
64            inner: node.clone(),
65            payload: PayloadTestContext::new(
66                node.payload_builder_handle.clone(),
67                attributes_generator,
68            )
69            .await?,
70            network: NetworkTestContext::new(node.network.clone()),
71            rpc: RpcTestContext { inner: node.add_ons_handle.rpc_registry },
72            canonical_stream: node.provider.canonical_state_stream(),
73        })
74    }
75
76    /// Establish a connection to the node
77    pub async fn connect(&mut self, node: &mut Self) {
78        self.network.add_peer(node.network.record()).await;
79        node.network.next_session_established().await;
80        self.network.next_session_established().await;
81    }
82
83    /// Advances the chain `length` blocks.
84    ///
85    /// Returns the added chain as a Vec of block hashes.
86    pub async fn advance(
87        &mut self,
88        length: u64,
89        tx_generator: impl Fn(u64) -> Pin<Box<dyn Future<Output = Bytes>>>,
90    ) -> eyre::Result<Vec<Payload::BuiltPayload>>
91    where
92        AddOns::EthApi: EthApiSpec<Provider: BlockReader<Block = BlockTy<Node::Types>>>
93            + EthTransactions
94            + TraceExt,
95    {
96        let mut chain = Vec::with_capacity(length as usize);
97        for i in 0..length {
98            let raw_tx = tx_generator(i).await;
99            let tx_hash = self.rpc.inject_tx(raw_tx).await?;
100            let payload = self.advance_block().await?;
101            let block_hash = payload.block().hash();
102            let block_number = payload.block().number();
103            self.assert_new_block(tx_hash, block_hash, block_number).await?;
104            chain.push(payload);
105        }
106        Ok(chain)
107    }
108
109    /// Creates a new payload from given attributes generator
110    /// expects a payload attribute event and waits until the payload is built.
111    ///
112    /// It triggers the resolve payload via engine api and expects the built payload event.
113    pub async fn new_payload(&mut self) -> eyre::Result<Payload::BuiltPayload> {
114        // trigger new payload building draining the pool
115        let eth_attr = self.payload.new_payload().await.unwrap();
116        // first event is the payload attributes
117        self.payload.expect_attr_event(eth_attr.clone()).await?;
118        // wait for the payload builder to have finished building
119        self.payload.wait_for_built_payload(eth_attr.payload_id()).await;
120        // ensure we're also receiving the built payload as event
121        Ok(self.payload.expect_built_payload().await?)
122    }
123
124    /// Triggers payload building job and submits it to the engine.
125    pub async fn build_and_submit_payload(&mut self) -> eyre::Result<Payload::BuiltPayload> {
126        let payload = self.new_payload().await?;
127
128        self.submit_payload(payload.clone()).await?;
129
130        Ok(payload)
131    }
132
133    /// Advances the node forward one block
134    pub async fn advance_block(&mut self) -> eyre::Result<Payload::BuiltPayload> {
135        let payload = self.build_and_submit_payload().await?;
136
137        // trigger forkchoice update via engine api to commit the block to the blockchain
138        self.update_forkchoice(payload.block().hash(), payload.block().hash()).await?;
139
140        Ok(payload)
141    }
142
143    /// Waits for block to be available on node.
144    pub async fn wait_block(
145        &self,
146        number: BlockNumber,
147        expected_block_hash: BlockHash,
148        wait_finish_checkpoint: bool,
149    ) -> eyre::Result<()> {
150        let mut check = !wait_finish_checkpoint;
151        loop {
152            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
153
154            if !check &&
155                wait_finish_checkpoint &&
156                let Some(checkpoint) =
157                    self.inner.provider.get_stage_checkpoint(StageId::Finish)? &&
158                checkpoint.block_number >= number
159            {
160                check = true
161            }
162
163            if check {
164                if let Some(latest_header) = self.inner.provider.header_by_number(number)? {
165                    assert_eq!(latest_header.hash_slow(), expected_block_hash);
166                    break
167                }
168                assert!(
169                    !wait_finish_checkpoint,
170                    "Finish checkpoint matches, but could not fetch block."
171                );
172            }
173        }
174        Ok(())
175    }
176
177    /// Waits for the node to unwind to the given block number
178    pub async fn wait_unwind(&self, number: BlockNumber) -> eyre::Result<()> {
179        loop {
180            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
181            if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Headers)? &&
182                checkpoint.block_number == number
183            {
184                break
185            }
186        }
187        Ok(())
188    }
189
190    /// Asserts that a new block has been added to the blockchain
191    /// and the tx has been included in the block.
192    ///
193    /// Does NOT work for pipeline since there's no stream notification!
194    pub async fn assert_new_block(
195        &mut self,
196        tip_tx_hash: B256,
197        block_hash: B256,
198        block_number: BlockNumber,
199    ) -> eyre::Result<()> {
200        // get head block from notifications stream and verify the tx has been pushed to the
201        // pool is actually present in the canonical block
202        let head = self.canonical_stream.next().await.unwrap();
203        let tx = head.tip().body().transactions().first();
204        assert_eq!(tx.unwrap().tx_hash().as_slice(), tip_tx_hash.as_slice());
205
206        loop {
207            // wait for the block to commit
208            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
209            if let Some(latest_block) =
210                self.inner.provider.block_by_number_or_tag(BlockNumberOrTag::Latest)? &&
211                latest_block.header().number() == block_number
212            {
213                // make sure the block hash we submitted via FCU engine api is the new latest
214                // block using an RPC call
215                assert_eq!(latest_block.header().hash_slow(), block_hash);
216                break
217            }
218        }
219        Ok(())
220    }
221
222    /// Gets block hash by number.
223    pub fn block_hash(&self, number: u64) -> BlockHash {
224        self.inner
225            .provider
226            .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(number))
227            .unwrap()
228            .unwrap()
229            .hash()
230    }
231
232    /// Sends FCU and waits for the node to sync to the given block.
233    pub async fn sync_to(&self, block: BlockHash) -> eyre::Result<()> {
234        let start = std::time::Instant::now();
235
236        while self
237            .inner
238            .provider
239            .sealed_header_by_id(BlockId::Number(BlockNumberOrTag::Latest))?
240            .is_none_or(|h| h.hash() != block)
241        {
242            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
243            self.update_forkchoice(block, block).await?;
244
245            assert!(start.elapsed() <= std::time::Duration::from_secs(40), "timed out");
246        }
247
248        // Hack to make sure that all components have time to process canonical state update.
249        // Otherwise, this might result in e.g "nonce too low" errors when advancing chain further,
250        // making tests flaky.
251        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
252
253        Ok(())
254    }
255
256    /// Sends a forkchoice update message to the engine.
257    pub async fn update_forkchoice(&self, current_head: B256, new_head: B256) -> eyre::Result<()> {
258        self.inner
259            .add_ons_handle
260            .beacon_engine_handle
261            .fork_choice_updated(
262                ForkchoiceState {
263                    head_block_hash: new_head,
264                    safe_block_hash: current_head,
265                    finalized_block_hash: current_head,
266                },
267                None,
268                EngineApiMessageVersion::default(),
269            )
270            .await?;
271
272        Ok(())
273    }
274
275    /// Sends forkchoice update to the engine api with a zero finalized hash
276    pub async fn update_optimistic_forkchoice(&self, hash: B256) -> eyre::Result<()> {
277        self.update_forkchoice(B256::ZERO, hash).await
278    }
279
280    /// Submits a payload to the engine.
281    pub async fn submit_payload(&self, payload: Payload::BuiltPayload) -> eyre::Result<B256> {
282        let block_hash = payload.block().hash();
283        self.inner
284            .add_ons_handle
285            .beacon_engine_handle
286            .new_payload(Payload::block_to_payload(payload.block().clone()))
287            .await?;
288
289        Ok(block_hash)
290    }
291
292    /// Returns the RPC URL.
293    pub fn rpc_url(&self) -> Url {
294        let addr = self.inner.rpc_server_handle().http_local_addr().unwrap();
295        format!("http://{addr}").parse().unwrap()
296    }
297
298    /// Returns an RPC client.
299    pub fn rpc_client(&self) -> Option<HttpClient> {
300        self.inner.rpc_server_handle().http_client()
301    }
302
303    /// Returns an Engine API client.
304    pub fn auth_server_handle(&self) -> AuthServerHandle {
305        self.inner.auth_server_handle().clone()
306    }
307
308    /// Creates a [`crate::testsuite::NodeClient`] from this test context.
309    ///
310    /// This helper method extracts the necessary handles and creates a client
311    /// that can interact with both the regular RPC and Engine API endpoints.
312    /// It automatically includes the beacon engine handle for direct consensus engine interaction.
313    pub fn to_node_client(&self) -> eyre::Result<crate::testsuite::NodeClient<Payload>> {
314        let rpc = self
315            .rpc_client()
316            .ok_or_else(|| eyre::eyre!("Failed to create HTTP RPC client for node"))?;
317        let auth = self.auth_server_handle();
318        let url = self.rpc_url();
319        let beacon_handle = self.inner.add_ons_handle.beacon_engine_handle.clone();
320
321        Ok(crate::testsuite::NodeClient::new_with_beacon_engine(rpc, auth, url, beacon_handle))
322    }
323
324    /// Calls the `testing_buildBlockV1` RPC on this node.
325    ///
326    /// This endpoint builds a block using the provided parent, payload attributes, and
327    /// transactions. Requires the `Testing` RPC module to be enabled.
328    pub async fn testing_build_block_v1(
329        &self,
330        request: TestingBuildBlockRequestV1,
331    ) -> eyre::Result<ExecutionPayloadEnvelopeV5> {
332        let client =
333            self.rpc_client().ok_or_else(|| eyre::eyre!("HTTP RPC client not available"))?;
334
335        let res: ExecutionPayloadEnvelopeV5 =
336            client.request("testing_buildBlockV1", [request]).await?;
337        eyre::Ok(res)
338    }
339}