reth_bench/bench/
context.rs1use crate::{
5 authenticated_transport::AuthenticatedTransportConnect,
6 bench::generate_big_block::BigBlocksInitialState, bench_mode::BenchMode,
7};
8use alloy_eips::BlockNumberOrTag;
9use alloy_primitives::B256;
10use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
11use alloy_rpc_client::ClientBuilder;
12use alloy_rpc_types_engine::JwtSecret;
13use alloy_transport::layers::{RateLimitRetryPolicy, RetryBackoffLayer};
14use futures::{stream, StreamExt, TryStreamExt};
15use reqwest::Url;
16use reth_node_core::args::{BenchmarkArgs, WaitForPersistence};
17use tracing::info;
18
19pub(crate) struct BenchContext {
25 pub(crate) auth_provider: RootProvider<AnyNetwork>,
27 pub(crate) block_provider: RootProvider<AnyNetwork>,
29 pub(crate) local_rpc_provider: RootProvider<AnyNetwork>,
31 pub(crate) benchmark_mode: BenchMode,
34 pub(crate) next_block: u64,
36 pub(crate) use_reth_namespace: bool,
38 pub(crate) rlp_blocks: bool,
40 pub(crate) wait_for_persistence: WaitForPersistence,
42 pub(crate) no_wait_for_caches: bool,
44 pub(crate) big_blocks_initial_state: Option<BigBlocksInitialState>,
46}
47
48impl BenchContext {
49 pub(crate) async fn new(bench_args: &BenchmarkArgs, rpc_url: String) -> eyre::Result<Self> {
52 info!(target: "reth-bench", "Running benchmark using data from RPC URL: {}", rpc_url);
53
54 if let Some(output) = &bench_args.output {
56 if output.is_file() {
57 return Err(eyre::eyre!("Output path must be a directory"));
58 }
59 if !output.exists() {
61 std::fs::create_dir_all(output)?;
62 info!(target: "reth-bench", "Created output directory: {:?}", output);
63 }
64 }
65
66 let retry_policy = RateLimitRetryPolicy::default().or(|_| true);
68 let max_retries = bench_args.rpc_block_fetch_retries.as_max_retries();
69 let client = ClientBuilder::default()
70 .layer(RetryBackoffLayer::new_with_policy(max_retries, 800, u64::MAX, retry_policy))
71 .http(rpc_url.parse()?);
72 let block_provider = RootProvider::<AnyNetwork>::new(client);
73
74 let auth_jwt = bench_args
76 .auth_jwtsecret
77 .clone()
78 .ok_or_else(|| eyre::eyre!("--jwt-secret must be provided for authenticated RPC"))?;
79
80 let jwt = std::fs::read_to_string(auth_jwt)?;
84 let jwt = JwtSecret::from_hex(jwt)?;
85
86 let auth_url = Url::parse(&bench_args.engine_rpc_url)?;
88
89 info!(target: "reth-bench", "Connecting to Engine RPC at {} for replay", auth_url);
91 let auth_transport = AuthenticatedTransportConnect::new(auth_url, jwt);
92 let client = ClientBuilder::default().connect_with(auth_transport).await?;
93 let auth_provider = RootProvider::<AnyNetwork>::new(client);
94
95 let local_rpc_url = Url::parse(&bench_args.local_rpc_url)?;
96 info!(target: "reth-bench", "Connecting to local regular RPC at {} for testing namespace calls", local_rpc_url);
97 let local_rpc_provider =
98 RootProvider::<AnyNetwork>::new(ClientBuilder::default().http(local_rpc_url));
99
100 let mut big_blocks_initial_state = None;
109 let (from, to) = if let Some(advance) = bench_args.advance {
110 if advance == 0 {
111 return Err(eyre::eyre!("--advance must be greater than 0"));
112 }
113
114 let head_block = auth_provider
115 .get_block_by_number(BlockNumberOrTag::Latest)
116 .await?
117 .ok_or_else(|| eyre::eyre!("Failed to fetch latest block for --advance"))?;
118 let head_number = head_block.header.number;
119 (Some(head_number), Some(head_number + advance))
120 } else if bench_args.big_blocks.is_some() && bench_args.from.is_none() {
121 let (from, initial_state) =
122 derive_big_blocks_initial_state(&auth_provider, &block_provider).await?;
123 big_blocks_initial_state = initial_state;
124
125 (Some(from), bench_args.to)
126 } else if bench_args.from.is_none() && bench_args.to.is_some() {
127 let head_block = auth_provider
128 .get_block_by_number(BlockNumberOrTag::Latest)
129 .await?
130 .ok_or_else(|| eyre::eyre!("Failed to fetch latest block from engine"))?;
131 let head_number = head_block.header.number;
132 info!(target: "reth-bench", "No --from provided, derived from engine head: {}", head_number);
133 (Some(head_number), bench_args.to)
134 } else {
135 (bench_args.from, bench_args.to)
136 };
137
138 let latest_block = block_provider
141 .get_block_by_number(BlockNumberOrTag::Latest)
142 .full()
143 .await?
144 .ok_or_else(|| eyre::eyre!("Failed to fetch latest block from RPC"))?;
145 let mut benchmark_mode = BenchMode::new(from, to, latest_block.into_inner().number());
146
147 let first_block = match benchmark_mode {
148 BenchMode::Continuous(start) => {
149 block_provider.get_block_by_number(start.into()).full().await?.ok_or_else(|| {
150 eyre::eyre!("Failed to fetch block {} from RPC for continuous mode", start)
151 })?
152 }
153 BenchMode::Range(ref mut range) => {
154 match range.next() {
155 Some(block_number) => {
156 block_provider
158 .get_block_by_number(block_number.into())
159 .full()
160 .await?
161 .ok_or_else(|| {
162 eyre::eyre!("Failed to fetch block {} from RPC", block_number)
163 })?
164 }
165 None => {
166 return Err(eyre::eyre!(
167 "Benchmark mode range is empty, please provide a larger range"
168 ));
169 }
170 }
171 }
172 };
173
174 let next_block = first_block.header.number + 1;
175 let rlp_blocks = bench_args.rlp_blocks;
176 let wait_for_persistence =
177 bench_args.wait_for_persistence.unwrap_or(WaitForPersistence::Never);
178 let use_reth_namespace = bench_args.reth_new_payload || rlp_blocks;
179 let no_wait_for_caches = bench_args.no_wait_for_caches;
180 Ok(Self {
181 auth_provider,
182 block_provider,
183 local_rpc_provider,
184 benchmark_mode,
185 next_block,
186 use_reth_namespace,
187 rlp_blocks,
188 wait_for_persistence,
189 no_wait_for_caches,
190 big_blocks_initial_state,
191 })
192 }
193}
194
195async fn derive_big_blocks_initial_state(
197 local_provider: &RootProvider<AnyNetwork>,
198 source_provider: &RootProvider<AnyNetwork>,
199) -> eyre::Result<(u64, Option<BigBlocksInitialState>)> {
200 let local_head = local_provider
201 .get_block_by_number(BlockNumberOrTag::Latest)
202 .full()
203 .await?
204 .ok_or_else(|| eyre::eyre!("Failed to fetch latest block from engine"))?;
205
206 let local_head_number = local_head.header.number;
207 let local_head_hash = local_head.header.hash;
208
209 let source_block_at_local_head = source_provider
210 .get_block_by_number(local_head_number.into())
211 .await?
212 .ok_or_else(|| eyre::eyre!("Failed to fetch block {local_head_number} from RPC"))?;
213
214 if source_block_at_local_head.header.number == local_head_number &&
216 source_block_at_local_head.header.hash == local_head_hash
217 {
218 return Ok((local_head_number, None));
219 }
220
221 let last_regular_block = if let Some(tx_hash) = local_head.transactions.hashes().last() {
223 let tx = source_provider
224 .get_transaction_by_hash(tx_hash)
225 .await?
226 .ok_or_else(|| eyre::eyre!("Failed to fetch transaction {tx_hash} from RPC"))?;
227 tx.block_number
228 .ok_or_else(|| eyre::eyre!("Transaction {tx_hash} from local head is pending on RPC"))?
229 } else {
230 return Err(eyre::eyre!(
231 "Synthetic local tip has no transactions, can't derive last regular block"
232 ));
233 };
234
235 let initial_state = BigBlocksInitialState {
236 prior_block_hashes: fetch_recent_block_hashes(source_provider, last_regular_block).await?,
237 next_synthetic_block_number: local_head_number + 1,
238 };
239
240 Ok((last_regular_block, Some(initial_state)))
241}
242
243async fn fetch_recent_block_hashes(
244 provider: &RootProvider<AnyNetwork>,
245 latest_regular_block: u64,
246) -> eyre::Result<Vec<(u64, B256)>> {
247 const BLOCKHASH_HISTORY: u64 = 256;
248 const MAX_CONCURRENT_BLOCK_HASH_REQUESTS: usize = 5;
249
250 let start = latest_regular_block.saturating_sub(BLOCKHASH_HISTORY - 1);
251 let hashes = stream::iter(start..=latest_regular_block)
252 .map(|block_number| async move {
253 provider
254 .get_block_by_number(block_number.into())
255 .await
256 .map(|block| block.map(|block| (block_number, block.header.hash)))
257 })
258 .buffered(MAX_CONCURRENT_BLOCK_HASH_REQUESTS)
259 .try_filter_map(|block_hash| async move { Ok(block_hash) })
260 .try_collect()
261 .await?;
262
263 Ok(hashes)
264}