Skip to main content

reth_bench/bench/
new_payload_only.rs

1//! Runs the `reth bench` command, sending only newPayload, without a forkchoiceUpdated call.
2
3use crate::{
4    bench::{
5        context::BenchContext,
6        helpers::fetch_block_access_list,
7        metrics_scraper::MetricsScraper,
8        output::{
9            NewPayloadResult, TotalGasOutput, TotalGasRow, GAS_OUTPUT_SUFFIX,
10            NEW_PAYLOAD_OUTPUT_SUFFIX,
11        },
12    },
13    valid_payload::{block_to_new_payload, call_new_payload_with_reth},
14};
15use alloy_provider::{ext::DebugApi, Provider};
16use clap::Parser;
17use csv::Writer;
18use eyre::{Context, OptionExt};
19use reth_cli_runner::CliContext;
20use reth_node_core::args::BenchmarkArgs;
21use std::time::{Duration, Instant};
22use tracing::{debug, info};
23
24/// `reth benchmark new-payload-only` command
25#[derive(Debug, Parser)]
26pub struct Command {
27    /// The RPC url to use for getting data.
28    #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
29    rpc_url: String,
30
31    /// The size of the block buffer (channel capacity) for prefetching blocks from the RPC
32    /// endpoint.
33    #[arg(
34        long = "rpc-block-buffer-size",
35        value_name = "RPC_BLOCK_BUFFER_SIZE",
36        default_value = "20",
37        verbatim_doc_comment
38    )]
39    rpc_block_buffer_size: usize,
40
41    #[command(flatten)]
42    benchmark: BenchmarkArgs,
43}
44
45impl Command {
46    /// Execute `benchmark new-payload-only` command
47    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
48        let BenchContext {
49            benchmark_mode,
50            block_provider,
51            auth_provider,
52            mut next_block,
53            use_reth_namespace,
54            rlp_blocks,
55            wait_for_persistence,
56            no_wait_for_caches,
57            ..
58        } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
59
60        let total_blocks = benchmark_mode.total_blocks();
61
62        let mut metrics_scraper = MetricsScraper::maybe_new(self.benchmark.metrics_url.clone());
63
64        if use_reth_namespace {
65            info!("Using reth_newPayload endpoint");
66        }
67
68        let buffer_size = self.rpc_block_buffer_size;
69
70        // Use a oneshot channel to propagate errors from the spawned task
71        let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel();
72        let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);
73
74        let block_provider_clone = block_provider.clone();
75        tokio::task::spawn(async move {
76            let block_provider = block_provider_clone;
77            while benchmark_mode.contains(next_block) {
78                let block_res = block_provider
79                    .get_block_by_number(next_block.into())
80                    .full()
81                    .await
82                    .wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
83                let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
84                    Ok(block) => block,
85                    Err(e) => {
86                        tracing::error!(target: "reth-bench", "Failed to fetch block {next_block}: {e}");
87                        let _ = error_sender.send(e);
88                        break;
89                    }
90                };
91
92                let rlp = if rlp_blocks {
93                    let Ok(rlp) = block_provider.debug_get_raw_block(next_block.into()).await
94                    else {
95                        tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}");
96                        let _ = error_sender
97                            .send(eyre::eyre!("Failed to fetch raw block {next_block}"));
98                        break;
99                    };
100                    Some(rlp)
101                } else {
102                    None
103                };
104
105                next_block += 1;
106                if let Err(e) = sender.send((block, rlp)).await {
107                    tracing::error!(target: "reth-bench", "Failed to send block data: {e}");
108                    break;
109                }
110            }
111        });
112
113        let mut results = Vec::new();
114        let mut blocks_processed = 0u64;
115        let total_benchmark_duration = Instant::now();
116        let mut total_wait_time = Duration::ZERO;
117
118        while let Some((block, rlp)) = {
119            let wait_start = Instant::now();
120            let result = receiver.recv().await;
121            total_wait_time += wait_start.elapsed();
122            result
123        } {
124            let block_number = block.header.number;
125            let transaction_count = block.transactions.len() as u64;
126            let gas_used = block.header.gas_used;
127
128            debug!(target: "reth-bench", number=?block.header.number, "Sending payload to engine");
129
130            let bal = if rlp.is_none() && block.header.block_access_list_hash.is_some() {
131                Some(fetch_block_access_list(&block_provider, block.header.number).await?)
132            } else {
133                None
134            };
135
136            let (version, params) = block_to_new_payload(
137                block,
138                rlp,
139                use_reth_namespace,
140                wait_for_persistence,
141                no_wait_for_caches,
142                bal,
143            )?;
144
145            let start = Instant::now();
146            let server_timings =
147                call_new_payload_with_reth(&auth_provider, version, params).await?;
148
149            let latency =
150                server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
151            let new_payload_result = NewPayloadResult {
152                gas_used,
153                latency,
154                persistence_wait: server_timings
155                    .as_ref()
156                    .map(|t| t.persistence_wait)
157                    .unwrap_or_default(),
158                execution_cache_wait: server_timings
159                    .as_ref()
160                    .map(|t| t.execution_cache_wait)
161                    .unwrap_or_default(),
162                sparse_trie_wait: server_timings
163                    .as_ref()
164                    .map(|t| t.sparse_trie_wait)
165                    .unwrap_or_default(),
166            };
167            blocks_processed += 1;
168            let progress = match total_blocks {
169                Some(total) => format!("{blocks_processed}/{total}"),
170                None => format!("{blocks_processed}"),
171            };
172            info!(target: "reth-bench", progress, %new_payload_result);
173
174            // current duration since the start of the benchmark minus the time
175            // waiting for blocks
176            let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
177
178            // record the current result
179            let row =
180                TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
181            results.push((row, new_payload_result));
182
183            if let Some(scraper) = metrics_scraper.as_mut() &&
184                let Err(err) = scraper.scrape_after_block(block_number).await
185            {
186                tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
187            }
188        }
189
190        // Check if the spawned task encountered an error
191        if let Ok(error) = error_receiver.try_recv() {
192            return Err(error);
193        }
194
195        let (gas_output_results, new_payload_results): (_, Vec<NewPayloadResult>) =
196            results.into_iter().unzip();
197
198        // write the csv output to files
199        if let Some(path) = self.benchmark.output {
200            // first write the new payload results to a file
201            let output_path = path.join(NEW_PAYLOAD_OUTPUT_SUFFIX);
202            info!(target: "reth-bench", "Writing newPayload call latency output to file: {:?}", output_path);
203            let mut writer = Writer::from_path(output_path)?;
204            for result in new_payload_results {
205                writer.serialize(result)?;
206            }
207            writer.flush()?;
208
209            // now write the gas output to a file
210            let output_path = path.join(GAS_OUTPUT_SUFFIX);
211            info!(target: "reth-bench", "Writing total gas output to file: {:?}", output_path);
212            let mut writer = Writer::from_path(output_path)?;
213            for row in &gas_output_results {
214                writer.serialize(row)?;
215            }
216            writer.flush()?;
217
218            if let Some(scraper) = &metrics_scraper {
219                scraper.write_csv(&path)?;
220            }
221
222            info!(target: "reth-bench", "Finished writing benchmark output files to {:?}.", path);
223        }
224
225        // accumulate the results and calculate the overall Ggas/s
226        let gas_output = TotalGasOutput::new(gas_output_results)?;
227        info!(
228            target: "reth-bench",
229            total_duration=?gas_output.total_duration,
230            total_gas_used=?gas_output.total_gas_used,
231            blocks_processed=?gas_output.blocks_processed,
232            "Total Ggas/s: {:.4}",
233            gas_output.total_gigagas_per_second()
234        );
235
236        Ok(())
237    }
238}