Skip to main content

reth_bench/bench/
context.rs

1//! This contains the [`BenchContext`], which is information that all replay-based benchmarks need.
2//! The initialization code is also the same, so this can be shared across benchmark commands.
3
4use crate::{
5    authenticated_transport::AuthenticatedTransportConnect,
6    bench::generate_big_block::BigBlocksInitialState, bench_mode::BenchMode,
7};
8use alloy_eips::BlockNumberOrTag;
9use alloy_primitives::B256;
10use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
11use alloy_rpc_client::ClientBuilder;
12use alloy_rpc_types_engine::JwtSecret;
13use alloy_transport::layers::{RateLimitRetryPolicy, RetryBackoffLayer};
14use futures::{stream, StreamExt, TryStreamExt};
15use reqwest::Url;
16use reth_node_core::args::{BenchmarkArgs, WaitForPersistence};
17use tracing::info;
18
19/// This is intended to be used by benchmarks that replay blocks from an RPC.
20///
21/// It contains an authenticated provider for engine API queries, a block provider for block
22/// queries, a [`BenchMode`] to determine whether the benchmark should run for a closed or open
23/// range of blocks, and the next block to fetch.
24pub(crate) struct BenchContext {
25    /// The auth provider is used for engine API queries.
26    pub(crate) auth_provider: RootProvider<AnyNetwork>,
27    /// The block provider is used for block queries.
28    pub(crate) block_provider: RootProvider<AnyNetwork>,
29    /// The local regular RPC provider is used for non-authenticated node RPCs like `testing_*`.
30    pub(crate) local_rpc_provider: RootProvider<AnyNetwork>,
31    /// The benchmark mode, which defines whether the benchmark should run for a closed or open
32    /// range of blocks.
33    pub(crate) benchmark_mode: BenchMode,
34    /// The next block to fetch.
35    pub(crate) next_block: u64,
36    /// Whether to use `reth_newPayload` endpoint instead of `engine_newPayload*`.
37    pub(crate) use_reth_namespace: bool,
38    /// Whether to fetch and replay RLP-encoded blocks.
39    pub(crate) rlp_blocks: bool,
40    /// Controls when `reth_newPayload` waits for persistence.
41    pub(crate) wait_for_persistence: WaitForPersistence,
42    /// Whether to skip waiting for caches (pass `wait_for_caches: false`).
43    pub(crate) no_wait_for_caches: bool,
44    /// Initial state for generated big blocks.
45    pub(crate) big_blocks_initial_state: Option<BigBlocksInitialState>,
46}
47
48impl BenchContext {
49    /// This is the initialization code for most benchmarks, taking in a [`BenchmarkArgs`] and
50    /// returning the providers needed to run a benchmark.
51    pub(crate) async fn new(bench_args: &BenchmarkArgs, rpc_url: String) -> eyre::Result<Self> {
52        info!(target: "reth-bench", "Running benchmark using data from RPC URL: {}", rpc_url);
53
54        // Ensure that output directory exists and is a directory
55        if let Some(output) = &bench_args.output {
56            if output.is_file() {
57                return Err(eyre::eyre!("Output path must be a directory"));
58            }
59            // Create the directory if it doesn't exist
60            if !output.exists() {
61                std::fs::create_dir_all(output)?;
62                info!(target: "reth-bench", "Created output directory: {:?}", output);
63            }
64        }
65
66        // set up alloy client for blocks, retrying on any errors, whether HTTP or OS
67        let retry_policy = RateLimitRetryPolicy::default().or(|_| true);
68        let max_retries = bench_args.rpc_block_fetch_retries.as_max_retries();
69        let client = ClientBuilder::default()
70            .layer(RetryBackoffLayer::new_with_policy(max_retries, 800, u64::MAX, retry_policy))
71            .http(rpc_url.parse()?);
72        let block_provider = RootProvider::<AnyNetwork>::new(client);
73
74        // construct the authenticated provider
75        let auth_jwt = bench_args
76            .auth_jwtsecret
77            .clone()
78            .ok_or_else(|| eyre::eyre!("--jwt-secret must be provided for authenticated RPC"))?;
79
80        // fetch jwt from file
81        //
82        // the jwt is hex encoded so we will decode it after
83        let jwt = std::fs::read_to_string(auth_jwt)?;
84        let jwt = JwtSecret::from_hex(jwt)?;
85
86        // get engine url
87        let auth_url = Url::parse(&bench_args.engine_rpc_url)?;
88
89        // construct the authed transport
90        info!(target: "reth-bench", "Connecting to Engine RPC at {} for replay", auth_url);
91        let auth_transport = AuthenticatedTransportConnect::new(auth_url, jwt);
92        let client = ClientBuilder::default().connect_with(auth_transport).await?;
93        let auth_provider = RootProvider::<AnyNetwork>::new(client);
94
95        let local_rpc_url = Url::parse(&bench_args.local_rpc_url)?;
96        info!(target: "reth-bench", "Connecting to local regular RPC at {} for testing namespace calls", local_rpc_url);
97        let local_rpc_provider =
98            RootProvider::<AnyNetwork>::new(ClientBuilder::default().http(local_rpc_url));
99
100        // Computes the block range for the benchmark.
101        //
102        // - If `--advance` is provided, fetches the latest block from the engine and sets:
103        //     - `from = head + 1`
104        //     - `to = head + advance`
105        // - If only `--to` is provided, fetches the latest block from the engine and sets:
106        //     - `from = head`
107        // - Otherwise, uses the values from `--from` and `--to`.
108        let mut big_blocks_initial_state = None;
109        let (from, to) = if let Some(advance) = bench_args.advance {
110            if advance == 0 {
111                return Err(eyre::eyre!("--advance must be greater than 0"));
112            }
113
114            let head_block = auth_provider
115                .get_block_by_number(BlockNumberOrTag::Latest)
116                .await?
117                .ok_or_else(|| eyre::eyre!("Failed to fetch latest block for --advance"))?;
118            let head_number = head_block.header.number;
119            (Some(head_number), Some(head_number + advance))
120        } else if bench_args.big_blocks.is_some() && bench_args.from.is_none() {
121            let (from, initial_state) =
122                derive_big_blocks_initial_state(&auth_provider, &block_provider).await?;
123            big_blocks_initial_state = initial_state;
124
125            (Some(from), bench_args.to)
126        } else if bench_args.from.is_none() && bench_args.to.is_some() {
127            let head_block = auth_provider
128                .get_block_by_number(BlockNumberOrTag::Latest)
129                .await?
130                .ok_or_else(|| eyre::eyre!("Failed to fetch latest block from engine"))?;
131            let head_number = head_block.header.number;
132            info!(target: "reth-bench", "No --from provided, derived from engine head: {}", head_number);
133            (Some(head_number), bench_args.to)
134        } else {
135            (bench_args.from, bench_args.to)
136        };
137
138        // If `--to` are not provided, we will run the benchmark continuously,
139        // starting at the latest block.
140        let latest_block = block_provider
141            .get_block_by_number(BlockNumberOrTag::Latest)
142            .full()
143            .await?
144            .ok_or_else(|| eyre::eyre!("Failed to fetch latest block from RPC"))?;
145        let mut benchmark_mode = BenchMode::new(from, to, latest_block.into_inner().number());
146
147        let first_block = match benchmark_mode {
148            BenchMode::Continuous(start) => {
149                block_provider.get_block_by_number(start.into()).full().await?.ok_or_else(|| {
150                    eyre::eyre!("Failed to fetch block {} from RPC for continuous mode", start)
151                })?
152            }
153            BenchMode::Range(ref mut range) => {
154                match range.next() {
155                    Some(block_number) => {
156                        // fetch first block in range
157                        block_provider
158                            .get_block_by_number(block_number.into())
159                            .full()
160                            .await?
161                            .ok_or_else(|| {
162                                eyre::eyre!("Failed to fetch block {} from RPC", block_number)
163                            })?
164                    }
165                    None => {
166                        return Err(eyre::eyre!(
167                            "Benchmark mode range is empty, please provide a larger range"
168                        ));
169                    }
170                }
171            }
172        };
173
174        let next_block = first_block.header.number + 1;
175        let rlp_blocks = bench_args.rlp_blocks;
176        let wait_for_persistence =
177            bench_args.wait_for_persistence.unwrap_or(WaitForPersistence::Never);
178        let use_reth_namespace = bench_args.reth_new_payload || rlp_blocks;
179        let no_wait_for_caches = bench_args.no_wait_for_caches;
180        Ok(Self {
181            auth_provider,
182            block_provider,
183            local_rpc_provider,
184            benchmark_mode,
185            next_block,
186            use_reth_namespace,
187            rlp_blocks,
188            wait_for_persistence,
189            no_wait_for_caches,
190            big_blocks_initial_state,
191        })
192    }
193}
194
195/// Derives the initial state for big blocks benchmark from RPC of the local node.
196async fn derive_big_blocks_initial_state(
197    local_provider: &RootProvider<AnyNetwork>,
198    source_provider: &RootProvider<AnyNetwork>,
199) -> eyre::Result<(u64, Option<BigBlocksInitialState>)> {
200    let local_head = local_provider
201        .get_block_by_number(BlockNumberOrTag::Latest)
202        .full()
203        .await?
204        .ok_or_else(|| eyre::eyre!("Failed to fetch latest block from engine"))?;
205
206    let local_head_number = local_head.header.number;
207    let local_head_hash = local_head.header.hash;
208
209    let source_block_at_local_head = source_provider
210        .get_block_by_number(local_head_number.into())
211        .await?
212        .ok_or_else(|| eyre::eyre!("Failed to fetch block {local_head_number} from RPC"))?;
213
214    // Node's tip is not synthetic, no initial state needed
215    if source_block_at_local_head.header.number == local_head_number &&
216        source_block_at_local_head.header.hash == local_head_hash
217    {
218        return Ok((local_head_number, None));
219    }
220
221    // If the tip is synthetic, derive last regular block from the last transaction the node has.
222    let last_regular_block = if let Some(tx_hash) = local_head.transactions.hashes().last() {
223        let tx = source_provider
224            .get_transaction_by_hash(tx_hash)
225            .await?
226            .ok_or_else(|| eyre::eyre!("Failed to fetch transaction {tx_hash} from RPC"))?;
227        tx.block_number
228            .ok_or_else(|| eyre::eyre!("Transaction {tx_hash} from local head is pending on RPC"))?
229    } else {
230        return Err(eyre::eyre!(
231            "Synthetic local tip has no transactions, can't derive last regular block"
232        ));
233    };
234
235    let initial_state = BigBlocksInitialState {
236        prior_block_hashes: fetch_recent_block_hashes(source_provider, last_regular_block).await?,
237        next_synthetic_block_number: local_head_number + 1,
238    };
239
240    Ok((last_regular_block, Some(initial_state)))
241}
242
243async fn fetch_recent_block_hashes(
244    provider: &RootProvider<AnyNetwork>,
245    latest_regular_block: u64,
246) -> eyre::Result<Vec<(u64, B256)>> {
247    const BLOCKHASH_HISTORY: u64 = 256;
248    const MAX_CONCURRENT_BLOCK_HASH_REQUESTS: usize = 5;
249
250    let start = latest_regular_block.saturating_sub(BLOCKHASH_HISTORY - 1);
251    let hashes = stream::iter(start..=latest_regular_block)
252        .map(|block_number| async move {
253            provider
254                .get_block_by_number(block_number.into())
255                .await
256                .map(|block| block.map(|block| (block_number, block.header.hash)))
257        })
258        .buffered(MAX_CONCURRENT_BLOCK_HASH_REQUESTS)
259        .try_filter_map(|block_hash| async move { Ok(block_hash) })
260        .try_collect()
261        .await?;
262
263    Ok(hashes)
264}