reth_bench/bench/
new_payload_only.rs1use crate::{
4 bench::{
5 context::BenchContext,
6 metrics_scraper::MetricsScraper,
7 output::{
8 NewPayloadResult, TotalGasOutput, TotalGasRow, GAS_OUTPUT_SUFFIX,
9 NEW_PAYLOAD_OUTPUT_SUFFIX,
10 },
11 },
12 valid_payload::{block_to_new_payload, call_new_payload_with_reth},
13};
14use alloy_provider::{ext::DebugApi, Provider};
15use clap::Parser;
16use csv::Writer;
17use eyre::{Context, OptionExt};
18use reth_cli_runner::CliContext;
19use reth_node_core::args::BenchmarkArgs;
20use std::time::{Duration, Instant};
21use tracing::{debug, info};
22
23#[derive(Debug, Parser)]
25pub struct Command {
26 #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
28 rpc_url: String,
29
30 #[arg(
33 long = "rpc-block-buffer-size",
34 value_name = "RPC_BLOCK_BUFFER_SIZE",
35 default_value = "20",
36 verbatim_doc_comment
37 )]
38 rpc_block_buffer_size: usize,
39
40 #[command(flatten)]
41 benchmark: BenchmarkArgs,
42}
43
44impl Command {
45 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
47 let BenchContext {
48 benchmark_mode,
49 block_provider,
50 auth_provider,
51 mut next_block,
52 use_reth_namespace,
53 rlp_blocks,
54 wait_for_persistence,
55 no_wait_for_caches,
56 } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
57
58 let total_blocks = benchmark_mode.total_blocks();
59
60 let mut metrics_scraper = MetricsScraper::maybe_new(self.benchmark.metrics_url.clone());
61
62 if use_reth_namespace {
63 info!("Using reth_newPayload endpoint");
64 }
65
66 let buffer_size = self.rpc_block_buffer_size;
67
68 let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel();
70 let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);
71
72 tokio::task::spawn(async move {
73 while benchmark_mode.contains(next_block) {
74 let block_res = block_provider
75 .get_block_by_number(next_block.into())
76 .full()
77 .await
78 .wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
79 let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
80 Ok(block) => block,
81 Err(e) => {
82 tracing::error!(target: "reth-bench", "Failed to fetch block {next_block}: {e}");
83 let _ = error_sender.send(e);
84 break;
85 }
86 };
87
88 let rlp = if rlp_blocks {
89 let Ok(rlp) = block_provider.debug_get_raw_block(next_block.into()).await
90 else {
91 tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}");
92 let _ = error_sender
93 .send(eyre::eyre!("Failed to fetch raw block {next_block}"));
94 break;
95 };
96 Some(rlp)
97 } else {
98 None
99 };
100
101 next_block += 1;
102 if let Err(e) = sender.send((block, rlp)).await {
103 tracing::error!(target: "reth-bench", "Failed to send block data: {e}");
104 break;
105 }
106 }
107 });
108
109 let mut results = Vec::new();
110 let mut blocks_processed = 0u64;
111 let total_benchmark_duration = Instant::now();
112 let mut total_wait_time = Duration::ZERO;
113
114 while let Some((block, rlp)) = {
115 let wait_start = Instant::now();
116 let result = receiver.recv().await;
117 total_wait_time += wait_start.elapsed();
118 result
119 } {
120 let block_number = block.header.number;
121 let transaction_count = block.transactions.len() as u64;
122 let gas_used = block.header.gas_used;
123
124 debug!(target: "reth-bench", number=?block.header.number, "Sending payload to engine");
125
126 let (version, params) = block_to_new_payload(
127 block,
128 rlp,
129 use_reth_namespace,
130 wait_for_persistence,
131 no_wait_for_caches,
132 )?;
133
134 let start = Instant::now();
135 let server_timings =
136 call_new_payload_with_reth(&auth_provider, version, params).await?;
137
138 let latency =
139 server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
140 let new_payload_result = NewPayloadResult {
141 gas_used,
142 latency,
143 persistence_wait: server_timings
144 .as_ref()
145 .map(|t| t.persistence_wait)
146 .unwrap_or_default(),
147 execution_cache_wait: server_timings
148 .as_ref()
149 .map(|t| t.execution_cache_wait)
150 .unwrap_or_default(),
151 sparse_trie_wait: server_timings
152 .as_ref()
153 .map(|t| t.sparse_trie_wait)
154 .unwrap_or_default(),
155 };
156 blocks_processed += 1;
157 let progress = match total_blocks {
158 Some(total) => format!("{blocks_processed}/{total}"),
159 None => format!("{blocks_processed}"),
160 };
161 info!(target: "reth-bench", progress, %new_payload_result);
162
163 let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
166
167 let row =
169 TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
170 results.push((row, new_payload_result));
171
172 if let Some(scraper) = metrics_scraper.as_mut() &&
173 let Err(err) = scraper.scrape_after_block(block_number).await
174 {
175 tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
176 }
177 }
178
179 if let Ok(error) = error_receiver.try_recv() {
181 return Err(error);
182 }
183
184 let (gas_output_results, new_payload_results): (_, Vec<NewPayloadResult>) =
185 results.into_iter().unzip();
186
187 if let Some(path) = self.benchmark.output {
189 let output_path = path.join(NEW_PAYLOAD_OUTPUT_SUFFIX);
191 info!(target: "reth-bench", "Writing newPayload call latency output to file: {:?}", output_path);
192 let mut writer = Writer::from_path(output_path)?;
193 for result in new_payload_results {
194 writer.serialize(result)?;
195 }
196 writer.flush()?;
197
198 let output_path = path.join(GAS_OUTPUT_SUFFIX);
200 info!(target: "reth-bench", "Writing total gas output to file: {:?}", output_path);
201 let mut writer = Writer::from_path(output_path)?;
202 for row in &gas_output_results {
203 writer.serialize(row)?;
204 }
205 writer.flush()?;
206
207 if let Some(scraper) = &metrics_scraper {
208 scraper.write_csv(&path)?;
209 }
210
211 info!(target: "reth-bench", "Finished writing benchmark output files to {:?}.", path);
212 }
213
214 let gas_output = TotalGasOutput::new(gas_output_results)?;
216 info!(
217 target: "reth-bench",
218 total_duration=?gas_output.total_duration,
219 total_gas_used=?gas_output.total_gas_used,
220 blocks_processed=?gas_output.blocks_processed,
221 "Total Ggas/s: {:.4}",
222 gas_output.total_gigagas_per_second()
223 );
224
225 Ok(())
226 }
227}