1use crate::{
5 bench::{
6 context::BenchContext,
7 helpers::parse_duration,
8 metrics_scraper::MetricsScraper,
9 output::{
10 write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
11 },
12 },
13 valid_payload::{
14 block_to_new_payload, call_forkchoice_updated_with_reth, call_new_payload_with_reth,
15 },
16};
17use alloy_provider::{ext::DebugApi, Provider};
18use alloy_rpc_types_engine::ForkchoiceState;
19use clap::Parser;
20use eyre::{Context, OptionExt};
21use futures::{stream, StreamExt, TryStreamExt};
22use reth_cli_runner::CliContext;
23use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
24use reth_node_core::args::BenchmarkArgs;
25use std::time::{Duration, Instant};
26use tracing::{debug, info, warn};
27
28#[derive(Debug, Parser)]
30pub struct Command {
31 #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
33 rpc_url: String,
34
35 #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
40 wait_time: Option<Duration>,
41
42 #[arg(
48 long = "persistence-threshold",
49 value_name = "PERSISTENCE_THRESHOLD",
50 default_value_t = DEFAULT_PERSISTENCE_THRESHOLD,
51 verbatim_doc_comment
52 )]
53 persistence_threshold: u64,
54
55 #[arg(
60 long = "persistence-timeout",
61 value_name = "PERSISTENCE_TIMEOUT",
62 value_parser = parse_duration,
63 default_value = "120s",
64 verbatim_doc_comment
65 )]
66 persistence_timeout: Duration,
67
68 #[arg(
71 long = "rpc-block-buffer-size",
72 value_name = "RPC_BLOCK_BUFFER_SIZE",
73 default_value = "20",
74 verbatim_doc_comment
75 )]
76 rpc_block_buffer_size: usize,
77
78 #[command(flatten)]
79 benchmark: BenchmarkArgs,
80}
81
82impl Command {
83 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
85 if let Some(duration) = self.wait_time {
87 info!(target: "reth-bench", "Using wait-time mode with {}ms delay between blocks", duration.as_millis());
88 }
89
90 let BenchContext {
91 benchmark_mode,
92 block_provider,
93 auth_provider,
94 next_block,
95 is_optimism,
96 use_reth_namespace,
97 rlp_blocks,
98 wait_for_persistence,
99 no_wait_for_caches,
100 } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
101
102 let total_blocks = benchmark_mode.total_blocks();
103
104 let mut metrics_scraper = MetricsScraper::maybe_new(self.benchmark.metrics_url.clone());
105
106 if use_reth_namespace {
107 info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints");
108 }
109
110 let buffer_size = self.rpc_block_buffer_size;
111
112 let mut blocks = Box::pin(
113 stream::iter((next_block..)
114 .take_while(|next_block| {
115 benchmark_mode.contains(*next_block)
116 }))
117 .map(|next_block| {
118 let block_provider = block_provider.clone();
119 async move {
120 let block_res = block_provider
121 .get_block_by_number(next_block.into())
122 .full()
123 .await
124 .wrap_err_with(|| {
125 format!("Failed to fetch block by number {next_block}")
126 });
127 let block =
128 match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
129 Ok(block) => block,
130 Err(e) => {
131 tracing::error!(target: "reth-bench", "Failed to fetch block {next_block}: {e}");
132 return Err(e)
133 }
134 };
135
136 let rlp = if rlp_blocks {
137 let rlp = match block_provider
138 .debug_get_raw_block(next_block.into())
139 .await
140 {
141 Ok(rlp) => rlp,
142 Err(e) => {
143 tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}: {e}");
144 return Err(e.into())
145 }
146 };
147 Some(rlp)
148 } else {
149 None
150 };
151
152 let head_block_hash = block.header.hash;
153 let safe_block_hash = block_provider
154 .get_block_by_number(block.header.number.saturating_sub(32).into());
155
156 let finalized_block_hash = block_provider
157 .get_block_by_number(block.header.number.saturating_sub(64).into());
158
159 let (safe, finalized) =
160 tokio::join!(safe_block_hash, finalized_block_hash);
161
162 let safe_block_hash = match safe {
163 Ok(Some(block)) => block.header.hash,
164 Ok(None) | Err(_) => head_block_hash,
165 };
166
167 let finalized_block_hash = match finalized {
168 Ok(Some(block)) => block.header.hash,
169 Ok(None) | Err(_) => head_block_hash,
170 };
171
172 Ok((block, head_block_hash, safe_block_hash, finalized_block_hash, rlp))
173 }
174 })
175 .buffered(buffer_size),
176 );
177
178 let mut results = Vec::new();
179 let mut blocks_processed = 0u64;
180 let total_benchmark_duration = Instant::now();
181 let mut total_wait_time = Duration::ZERO;
182
183 while let Some((block, head, safe, finalized, rlp)) = {
184 let wait_start = Instant::now();
185 let result = blocks.try_next().await?;
186 total_wait_time += wait_start.elapsed();
187 result
188 } {
189 let gas_used = block.header.gas_used;
190 let gas_limit = block.header.gas_limit;
191 let block_number = block.header.number;
192 let transaction_count = block.transactions.len() as u64;
193
194 debug!(target: "reth-bench", ?block_number, "Sending payload");
195
196 let forkchoice_state = ForkchoiceState {
197 head_block_hash: head,
198 safe_block_hash: safe,
199 finalized_block_hash: finalized,
200 };
201
202 let (version, params) = block_to_new_payload(
203 block,
204 is_optimism,
205 rlp,
206 use_reth_namespace,
207 wait_for_persistence,
208 no_wait_for_caches,
209 )?;
210 let start = Instant::now();
211 let server_timings =
212 call_new_payload_with_reth(&auth_provider, version, params).await?;
213
214 let np_latency =
215 server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
216 let new_payload_result = NewPayloadResult {
217 gas_used,
218 latency: np_latency,
219 persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
220 execution_cache_wait: server_timings
221 .as_ref()
222 .map(|t| t.execution_cache_wait)
223 .unwrap_or_default(),
224 sparse_trie_wait: server_timings
225 .as_ref()
226 .map(|t| t.sparse_trie_wait)
227 .unwrap_or_default(),
228 };
229
230 let fcu_start = Instant::now();
231 call_forkchoice_updated_with_reth(&auth_provider, version, forkchoice_state).await?;
232 let fcu_latency = fcu_start.elapsed();
233
234 let total_latency = if server_timings.is_some() {
235 np_latency + fcu_latency
239 } else {
240 start.elapsed()
241 };
242 let combined_result = CombinedResult {
243 block_number,
244 gas_limit,
245 transaction_count,
246 new_payload_result,
247 fcu_latency,
248 total_latency,
249 };
250
251 blocks_processed += 1;
254 let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
255 let progress = match total_blocks {
256 Some(total) => format!("{blocks_processed}/{total}"),
257 None => format!("{blocks_processed}"),
258 };
259 info!(target: "reth-bench", progress, %combined_result);
260
261 if let Some(scraper) = metrics_scraper.as_mut() &&
262 let Err(err) = scraper.scrape_after_block(block_number).await
263 {
264 warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
265 }
266
267 if let Some(wait_time) = self.wait_time {
268 tokio::time::sleep(wait_time).await;
269 }
270
271 let gas_row =
272 TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
273 results.push((gas_row, combined_result));
274 }
275
276 let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
277 results.into_iter().unzip();
278
279 if let Some(ref path) = self.benchmark.output {
280 write_benchmark_results(path, &gas_output_results, &combined_results)?;
281 }
282
283 if let (Some(path), Some(scraper)) = (&self.benchmark.output, &metrics_scraper) {
284 scraper.write_csv(path)?;
285 }
286
287 let gas_output =
288 TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
289
290 info!(
291 target: "reth-bench",
292 total_gas_used = gas_output.total_gas_used,
293 total_duration = ?gas_output.total_duration,
294 execution_duration = ?gas_output.execution_duration,
295 blocks_processed = gas_output.blocks_processed,
296 wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
297 execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
298 "Benchmark complete"
299 );
300
301 Ok(())
302 }
303}