reth_exex_test_utils/
lib.rs

1//! Test helpers for `reth-exex`
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
9#![cfg_attr(not(test), warn(unused_crate_dependencies))]
10
11use std::{
12    fmt::Debug,
13    future::{poll_fn, Future},
14    sync::Arc,
15    task::Poll,
16};
17
18use alloy_eips::BlockNumHash;
19use futures_util::FutureExt;
20use reth_chainspec::{ChainSpec, MAINNET};
21use reth_consensus::test_utils::TestConsensus;
22use reth_db::{
23    test_utils::{create_test_rw_db, create_test_static_files_dir, TempDatabase},
24    DatabaseEnv,
25};
26use reth_db_common::init::init_genesis;
27use reth_evm::test_utils::MockExecutorProvider;
28use reth_execution_types::Chain;
29use reth_exex::{ExExContext, ExExEvent, ExExNotification, ExExNotifications, Wal};
30use reth_network::{config::SecretKey, NetworkConfigBuilder, NetworkManager};
31use reth_node_api::{
32    FullNodeTypes, FullNodeTypesAdapter, NodePrimitives, NodeTypes, NodeTypesWithDBAdapter,
33    NodeTypesWithEngine,
34};
35use reth_node_builder::{
36    components::{
37        BasicPayloadServiceBuilder, Components, ComponentsBuilder, ConsensusBuilder,
38        ExecutorBuilder, NodeComponentsBuilder, PoolBuilder,
39    },
40    BuilderContext, Node, NodeAdapter, RethFullAdapter,
41};
42use reth_node_core::node_config::NodeConfig;
43use reth_node_ethereum::{
44    node::{EthereumAddOns, EthereumNetworkBuilder, EthereumPayloadBuilder},
45    EthEngineTypes, EthEvmConfig,
46};
47use reth_payload_builder::noop::NoopPayloadBuilderService;
48use reth_primitives::{EthPrimitives, RecoveredBlock, TransactionSigned};
49use reth_primitives_traits::Block as _;
50use reth_provider::{
51    providers::{BlockchainProvider, StaticFileProvider},
52    BlockReader, EthStorage, ProviderFactory,
53};
54use reth_tasks::TaskManager;
55use reth_transaction_pool::test_utils::{testing_pool, TestPool};
56use tempfile::TempDir;
57use thiserror::Error;
58use tokio::sync::mpsc::{Sender, UnboundedReceiver};
59
60/// A test [`PoolBuilder`] that builds a [`TestPool`].
61#[derive(Debug, Default, Clone, Copy)]
62#[non_exhaustive]
63pub struct TestPoolBuilder;
64
65impl<Node> PoolBuilder<Node> for TestPoolBuilder
66where
67    Node: FullNodeTypes<Types: NodeTypes<Primitives: NodePrimitives<SignedTx = TransactionSigned>>>,
68{
69    type Pool = TestPool;
70
71    async fn build_pool(self, _ctx: &BuilderContext<Node>) -> eyre::Result<Self::Pool> {
72        Ok(testing_pool())
73    }
74}
75
76/// A test [`ExecutorBuilder`] that builds a [`MockExecutorProvider`].
77#[derive(Debug, Default, Clone, Copy)]
78#[non_exhaustive]
79pub struct TestExecutorBuilder;
80
81impl<Node> ExecutorBuilder<Node> for TestExecutorBuilder
82where
83    Node: FullNodeTypes<Types: NodeTypes<ChainSpec = ChainSpec, Primitives = EthPrimitives>>,
84{
85    type EVM = EthEvmConfig;
86    type Executor = MockExecutorProvider;
87
88    async fn build_evm(
89        self,
90        ctx: &BuilderContext<Node>,
91    ) -> eyre::Result<(Self::EVM, Self::Executor)> {
92        let evm_config = EthEvmConfig::new(ctx.chain_spec());
93        let executor = MockExecutorProvider::default();
94
95        Ok((evm_config, executor))
96    }
97}
98
99/// A test [`ConsensusBuilder`] that builds a [`TestConsensus`].
100#[derive(Debug, Default, Clone, Copy)]
101#[non_exhaustive]
102pub struct TestConsensusBuilder;
103
104impl<Node> ConsensusBuilder<Node> for TestConsensusBuilder
105where
106    Node: FullNodeTypes,
107{
108    type Consensus = Arc<TestConsensus>;
109
110    async fn build_consensus(self, _ctx: &BuilderContext<Node>) -> eyre::Result<Self::Consensus> {
111        Ok(Arc::new(TestConsensus::default()))
112    }
113}
114
115/// A test [`Node`].
116#[derive(Debug, Default, Clone, Copy)]
117#[non_exhaustive]
118pub struct TestNode;
119
120impl NodeTypes for TestNode {
121    type Primitives = EthPrimitives;
122    type ChainSpec = ChainSpec;
123    type StateCommitment = reth_trie_db::MerklePatriciaTrie;
124    type Storage = EthStorage;
125}
126
127impl NodeTypesWithEngine for TestNode {
128    type Engine = EthEngineTypes;
129}
130
131impl<N> Node<N> for TestNode
132where
133    N: FullNodeTypes<
134        Types: NodeTypesWithEngine<
135            Engine = EthEngineTypes,
136            ChainSpec = ChainSpec,
137            Primitives = EthPrimitives,
138            Storage = EthStorage,
139        >,
140    >,
141{
142    type ComponentsBuilder = ComponentsBuilder<
143        N,
144        TestPoolBuilder,
145        BasicPayloadServiceBuilder<EthereumPayloadBuilder>,
146        EthereumNetworkBuilder,
147        TestExecutorBuilder,
148        TestConsensusBuilder,
149    >;
150    type AddOns = EthereumAddOns<
151        NodeAdapter<N, <Self::ComponentsBuilder as NodeComponentsBuilder<N>>::Components>,
152    >;
153
154    fn components_builder(&self) -> Self::ComponentsBuilder {
155        ComponentsBuilder::default()
156            .node_types::<N>()
157            .pool(TestPoolBuilder::default())
158            .payload(BasicPayloadServiceBuilder::default())
159            .network(EthereumNetworkBuilder::default())
160            .executor(TestExecutorBuilder::default())
161            .consensus(TestConsensusBuilder::default())
162    }
163
164    fn add_ons(&self) -> Self::AddOns {
165        EthereumAddOns::default()
166    }
167}
168
169/// A shared [`TempDatabase`] used for testing
170pub type TmpDB = Arc<TempDatabase<DatabaseEnv>>;
171/// The [`NodeAdapter`] for the [`TestExExContext`]. Contains type necessary to
172/// boot the testing environment
173pub type Adapter = NodeAdapter<
174    RethFullAdapter<TmpDB, TestNode>,
175    <<TestNode as Node<
176        FullNodeTypesAdapter<
177            TestNode,
178            TmpDB,
179            BlockchainProvider<NodeTypesWithDBAdapter<TestNode, TmpDB>>,
180        >,
181    >>::ComponentsBuilder as NodeComponentsBuilder<RethFullAdapter<TmpDB, TestNode>>>::Components,
182>;
183/// An [`ExExContext`] using the [`Adapter`] type.
184pub type TestExExContext = ExExContext<Adapter>;
185
186/// A helper type for testing Execution Extensions.
187#[derive(Debug)]
188pub struct TestExExHandle {
189    /// Genesis block that was inserted into the storage
190    pub genesis: RecoveredBlock<reth_primitives::Block>,
191    /// Provider Factory for accessing the emphemeral storage of the host node
192    pub provider_factory: ProviderFactory<NodeTypesWithDBAdapter<TestNode, TmpDB>>,
193    /// Channel for receiving events from the Execution Extension
194    pub events_rx: UnboundedReceiver<ExExEvent>,
195    /// Channel for sending notifications to the Execution Extension
196    pub notifications_tx: Sender<ExExNotification>,
197    /// Node task manager
198    pub tasks: TaskManager,
199    /// WAL temp directory handle
200    _wal_directory: TempDir,
201}
202
203impl TestExExHandle {
204    /// Send a notification to the Execution Extension that the chain has been committed
205    pub async fn send_notification_chain_committed(&self, chain: Chain) -> eyre::Result<()> {
206        self.notifications_tx
207            .send(ExExNotification::ChainCommitted { new: Arc::new(chain) })
208            .await?;
209        Ok(())
210    }
211
212    /// Send a notification to the Execution Extension that the chain has been reorged
213    pub async fn send_notification_chain_reorged(
214        &self,
215        old: Chain,
216        new: Chain,
217    ) -> eyre::Result<()> {
218        self.notifications_tx
219            .send(ExExNotification::ChainReorged { old: Arc::new(old), new: Arc::new(new) })
220            .await?;
221        Ok(())
222    }
223
224    /// Send a notification to the Execution Extension that the chain has been reverted
225    pub async fn send_notification_chain_reverted(&self, chain: Chain) -> eyre::Result<()> {
226        self.notifications_tx
227            .send(ExExNotification::ChainReverted { old: Arc::new(chain) })
228            .await?;
229        Ok(())
230    }
231
232    /// Asserts that the Execution Extension did not emit any events.
233    #[track_caller]
234    pub fn assert_events_empty(&self) {
235        assert!(self.events_rx.is_empty());
236    }
237
238    /// Asserts that the Execution Extension emitted a `FinishedHeight` event with the correct
239    /// height.
240    #[track_caller]
241    pub fn assert_event_finished_height(&mut self, height: BlockNumHash) -> eyre::Result<()> {
242        let event = self.events_rx.try_recv()?;
243        assert_eq!(event, ExExEvent::FinishedHeight(height));
244        Ok(())
245    }
246}
247
248/// Creates a new [`ExExContext`].
249///
250/// This is a convenience function that does the following:
251/// 1. Sets up an [`ExExContext`] with all dependencies.
252/// 2. Inserts the genesis block of the provided (chain spec)[`ChainSpec`] into the storage.
253/// 3. Creates a channel for receiving events from the Execution Extension.
254/// 4. Creates a channel for sending notifications to the Execution Extension.
255///
256/// # Warning
257/// The genesis block is not sent to the notifications channel. The caller is responsible for
258/// doing this.
259pub async fn test_exex_context_with_chain_spec(
260    chain_spec: Arc<ChainSpec>,
261) -> eyre::Result<(ExExContext<Adapter>, TestExExHandle)> {
262    let transaction_pool = testing_pool();
263    let evm_config = EthEvmConfig::new(chain_spec.clone());
264    let executor = MockExecutorProvider::default();
265    let consensus = Arc::new(TestConsensus::default());
266
267    let (static_dir, _) = create_test_static_files_dir();
268    let db = create_test_rw_db();
269    let provider_factory = ProviderFactory::<NodeTypesWithDBAdapter<TestNode, _>>::new(
270        db,
271        chain_spec.clone(),
272        StaticFileProvider::read_write(static_dir.into_path()).expect("static file provider"),
273    );
274
275    let genesis_hash = init_genesis(&provider_factory)?;
276    let provider = BlockchainProvider::new(provider_factory.clone())?;
277
278    let network_manager = NetworkManager::new(
279        NetworkConfigBuilder::new(SecretKey::new(&mut rand::thread_rng()))
280            .with_unused_discovery_port()
281            .with_unused_listener_port()
282            .build(provider_factory.clone()),
283    )
284    .await?;
285    let network = network_manager.handle().clone();
286    let tasks = TaskManager::current();
287    let task_executor = tasks.executor();
288    tasks.executor().spawn(network_manager);
289
290    let (_, payload_builder_handle) = NoopPayloadBuilderService::<EthEngineTypes>::new();
291
292    let components = NodeAdapter::<FullNodeTypesAdapter<_, _, _>, _> {
293        components: Components {
294            transaction_pool,
295            evm_config,
296            executor,
297            consensus,
298            network,
299            payload_builder_handle,
300        },
301        task_executor,
302        provider,
303    };
304
305    let genesis = provider_factory
306        .block_by_hash(genesis_hash)?
307        .ok_or_else(|| eyre::eyre!("genesis block not found"))?
308        .seal_slow()
309        .try_recover()?;
310
311    let head = genesis.num_hash();
312
313    let wal_directory = tempfile::tempdir()?;
314    let wal = Wal::new(wal_directory.path())?;
315
316    let (events_tx, events_rx) = tokio::sync::mpsc::unbounded_channel();
317    let (notifications_tx, notifications_rx) = tokio::sync::mpsc::channel(1);
318    let notifications = ExExNotifications::new(
319        head,
320        components.provider.clone(),
321        components.components.executor.clone(),
322        notifications_rx,
323        wal.handle(),
324    );
325
326    let ctx = ExExContext {
327        head,
328        config: NodeConfig::test(),
329        reth_config: reth_config::Config::default(),
330        events: events_tx,
331        notifications,
332        components,
333    };
334
335    Ok((
336        ctx,
337        TestExExHandle {
338            genesis,
339            provider_factory,
340            events_rx,
341            notifications_tx,
342            tasks,
343            _wal_directory: wal_directory,
344        },
345    ))
346}
347
348/// Creates a new [`ExExContext`] with (mainnet)[`MAINNET`] chain spec.
349///
350/// For more information see [`test_exex_context_with_chain_spec`].
351pub async fn test_exex_context() -> eyre::Result<(ExExContext<Adapter>, TestExExHandle)> {
352    test_exex_context_with_chain_spec(MAINNET.clone()).await
353}
354
355/// An extension trait for polling an Execution Extension future.
356pub trait PollOnce {
357    /// Polls the given Execution Extension future __once__. The future should be
358    /// (pinned)[`std::pin::pin`].
359    ///
360    /// # Returns
361    /// - `Ok(())` if the future returned [`Poll::Pending`]. The future can be polled again.
362    /// - `Err(PollOnceError::FutureIsReady)` if the future returned [`Poll::Ready`] without an
363    ///   error. The future should never resolve.
364    /// - `Err(PollOnceError::FutureError(err))` if the future returned [`Poll::Ready`] with an
365    ///   error. Something went wrong.
366    fn poll_once(&mut self) -> impl Future<Output = Result<(), PollOnceError>> + Send;
367}
368
369/// An Execution Extension future polling error.
370#[derive(Error, Debug)]
371pub enum PollOnceError {
372    /// The future returned [`Poll::Ready`] without an error, but it should never resolve.
373    #[error("Execution Extension future returned Ready, but it should never resolve")]
374    FutureIsReady,
375    /// The future returned [`Poll::Ready`] with an error.
376    #[error(transparent)]
377    FutureError(#[from] eyre::Error),
378}
379
380impl<F: Future<Output = eyre::Result<()>> + Unpin + Send> PollOnce for F {
381    async fn poll_once(&mut self) -> Result<(), PollOnceError> {
382        poll_fn(|cx| match self.poll_unpin(cx) {
383            Poll::Ready(Ok(())) => Poll::Ready(Err(PollOnceError::FutureIsReady)),
384            Poll::Ready(Err(err)) => Poll::Ready(Err(PollOnceError::FutureError(err))),
385            Poll::Pending => Poll::Ready(Ok(())),
386        })
387        .await
388    }
389}
390
391#[cfg(test)]
392mod tests {
393    use super::*;
394
395    #[tokio::test]
396    async fn check_test_context_creation() {
397        let _ = test_exex_context().await.unwrap();
398    }
399}