reth_bench/bench/
new_payload_only.rs

1//! Runs the `reth bench` command, sending only newPayload, without a forkchoiceUpdated call.
2
3use crate::{
4    bench::{
5        context::BenchContext,
6        output::{
7            NewPayloadResult, TotalGasOutput, TotalGasRow, GAS_OUTPUT_SUFFIX,
8            NEW_PAYLOAD_OUTPUT_SUFFIX,
9        },
10    },
11    valid_payload::{block_to_new_payload, call_new_payload},
12};
13use alloy_provider::Provider;
14use clap::Parser;
15use csv::Writer;
16use eyre::{Context, OptionExt};
17use reth_cli_runner::CliContext;
18use reth_node_core::args::BenchmarkArgs;
19use std::time::{Duration, Instant};
20use tracing::{debug, info};
21
22/// `reth benchmark new-payload-only` command
23#[derive(Debug, Parser)]
24pub struct Command {
25    /// The RPC url to use for getting data.
26    #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
27    rpc_url: String,
28
29    /// The size of the block buffer (channel capacity) for prefetching blocks from the RPC
30    /// endpoint.
31    #[arg(
32        long = "rpc-block-buffer-size",
33        value_name = "RPC_BLOCK_BUFFER_SIZE",
34        default_value = "20",
35        verbatim_doc_comment
36    )]
37    rpc_block_buffer_size: usize,
38
39    #[command(flatten)]
40    benchmark: BenchmarkArgs,
41}
42
43impl Command {
44    /// Execute `benchmark new-payload-only` command
45    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
46        let BenchContext {
47            benchmark_mode,
48            block_provider,
49            auth_provider,
50            mut next_block,
51            is_optimism,
52        } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
53
54        let buffer_size = self.rpc_block_buffer_size;
55
56        // Use a oneshot channel to propagate errors from the spawned task
57        let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel();
58        let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);
59
60        tokio::task::spawn(async move {
61            while benchmark_mode.contains(next_block) {
62                let block_res = block_provider
63                    .get_block_by_number(next_block.into())
64                    .full()
65                    .await
66                    .wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
67                let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
68                    Ok(block) => block,
69                    Err(e) => {
70                        tracing::error!("Failed to fetch block {next_block}: {e}");
71                        let _ = error_sender.send(e);
72                        break;
73                    }
74                };
75                let header = block.header.clone();
76
77                let (version, params) = match block_to_new_payload(block, is_optimism) {
78                    Ok(result) => result,
79                    Err(e) => {
80                        tracing::error!("Failed to convert block to new payload: {e}");
81                        let _ = error_sender.send(e);
82                        break;
83                    }
84                };
85
86                next_block += 1;
87                if let Err(e) = sender.send((header, version, params)).await {
88                    tracing::error!("Failed to send block data: {e}");
89                    break;
90                }
91            }
92        });
93
94        // put results in a summary vec so they can be printed at the end
95        let mut results = Vec::new();
96        let total_benchmark_duration = Instant::now();
97        let mut total_wait_time = Duration::ZERO;
98
99        while let Some((header, version, params)) = {
100            let wait_start = Instant::now();
101            let result = receiver.recv().await;
102            total_wait_time += wait_start.elapsed();
103            result
104        } {
105            // just put gas used here
106            let gas_used = header.gas_used;
107
108            let block_number = header.number;
109
110            debug!(
111                target: "reth-bench",
112                number=?header.number,
113                "Sending payload to engine",
114            );
115
116            let start = Instant::now();
117            call_new_payload(&auth_provider, version, params).await?;
118
119            let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
120            info!(%new_payload_result);
121
122            // current duration since the start of the benchmark minus the time
123            // waiting for blocks
124            let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
125
126            // record the current result
127            let row = TotalGasRow { block_number, gas_used, time: current_duration };
128            results.push((row, new_payload_result));
129        }
130
131        // Check if the spawned task encountered an error
132        if let Ok(error) = error_receiver.try_recv() {
133            return Err(error);
134        }
135
136        let (gas_output_results, new_payload_results): (_, Vec<NewPayloadResult>) =
137            results.into_iter().unzip();
138
139        // write the csv output to files
140        if let Some(path) = self.benchmark.output {
141            // first write the new payload results to a file
142            let output_path = path.join(NEW_PAYLOAD_OUTPUT_SUFFIX);
143            info!("Writing newPayload call latency output to file: {:?}", output_path);
144            let mut writer = Writer::from_path(output_path)?;
145            for result in new_payload_results {
146                writer.serialize(result)?;
147            }
148            writer.flush()?;
149
150            // now write the gas output to a file
151            let output_path = path.join(GAS_OUTPUT_SUFFIX);
152            info!("Writing total gas output to file: {:?}", output_path);
153            let mut writer = Writer::from_path(output_path)?;
154            for row in &gas_output_results {
155                writer.serialize(row)?;
156            }
157            writer.flush()?;
158
159            info!("Finished writing benchmark output files to {:?}.", path);
160        }
161
162        // accumulate the results and calculate the overall Ggas/s
163        let gas_output = TotalGasOutput::new(gas_output_results)?;
164        info!(
165            total_duration=?gas_output.total_duration,
166            total_gas_used=?gas_output.total_gas_used,
167            blocks_processed=?gas_output.blocks_processed,
168            "Total Ggas/s: {:.4}",
169            gas_output.total_gigagas_per_second()
170        );
171
172        Ok(())
173    }
174}