reth_bench/bench/
new_payload_fcu.rs

1//! Runs the `reth bench` command, calling first newPayload for each block, then calling
2//! forkchoiceUpdated.
3
4use crate::{
5    bench::{
6        context::BenchContext,
7        output::{
8            CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, COMBINED_OUTPUT_SUFFIX,
9            GAS_OUTPUT_SUFFIX,
10        },
11    },
12    valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
13};
14use alloy_provider::Provider;
15use alloy_rpc_types_engine::ForkchoiceState;
16use clap::Parser;
17use csv::Writer;
18use eyre::{Context, OptionExt};
19use humantime::parse_duration;
20use reth_cli_runner::CliContext;
21use reth_node_core::args::BenchmarkArgs;
22use std::time::{Duration, Instant};
23use tracing::{debug, info};
24
25/// `reth benchmark new-payload-fcu` command
26#[derive(Debug, Parser)]
27pub struct Command {
28    /// The RPC url to use for getting data.
29    #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
30    rpc_url: String,
31
32    /// How long to wait after a forkchoice update before sending the next payload.
33    #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, default_value = "250ms", verbatim_doc_comment)]
34    wait_time: Duration,
35
36    /// The size of the block buffer (channel capacity) for prefetching blocks from the RPC
37    /// endpoint.
38    #[arg(
39        long = "rpc-block-buffer-size",
40        value_name = "RPC_BLOCK_BUFFER_SIZE",
41        default_value = "20",
42        verbatim_doc_comment
43    )]
44    rpc_block_buffer_size: usize,
45
46    #[command(flatten)]
47    benchmark: BenchmarkArgs,
48}
49
50impl Command {
51    /// Execute `benchmark new-payload-fcu` command
52    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
53        let BenchContext {
54            benchmark_mode,
55            block_provider,
56            auth_provider,
57            mut next_block,
58            is_optimism,
59        } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
60
61        let buffer_size = self.rpc_block_buffer_size;
62
63        // Use a oneshot channel to propagate errors from the spawned task
64        let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel();
65        let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);
66
67        tokio::task::spawn(async move {
68            while benchmark_mode.contains(next_block) {
69                let block_res = block_provider
70                    .get_block_by_number(next_block.into())
71                    .full()
72                    .await
73                    .wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
74                let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
75                    Ok(block) => block,
76                    Err(e) => {
77                        tracing::error!("Failed to fetch block {next_block}: {e}");
78                        let _ = error_sender.send(e);
79                        break;
80                    }
81                };
82
83                let head_block_hash = block.header.hash;
84                let safe_block_hash = block_provider
85                    .get_block_by_number(block.header.number.saturating_sub(32).into());
86
87                let finalized_block_hash = block_provider
88                    .get_block_by_number(block.header.number.saturating_sub(64).into());
89
90                let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,);
91
92                let safe_block_hash = match safe {
93                    Ok(Some(block)) => block.header.hash,
94                    Ok(None) | Err(_) => head_block_hash,
95                };
96
97                let finalized_block_hash = match finalized {
98                    Ok(Some(block)) => block.header.hash,
99                    Ok(None) | Err(_) => head_block_hash,
100                };
101
102                next_block += 1;
103                if let Err(e) = sender
104                    .send((block, head_block_hash, safe_block_hash, finalized_block_hash))
105                    .await
106                {
107                    tracing::error!("Failed to send block data: {e}");
108                    break;
109                }
110            }
111        });
112
113        // put results in a summary vec so they can be printed at the end
114        let mut results = Vec::new();
115        let total_benchmark_duration = Instant::now();
116        let mut total_wait_time = Duration::ZERO;
117
118        while let Some((block, head, safe, finalized)) = {
119            let wait_start = Instant::now();
120            let result = receiver.recv().await;
121            total_wait_time += wait_start.elapsed();
122            result
123        } {
124            // just put gas used here
125            let gas_used = block.header.gas_used;
126            let block_number = block.header.number;
127            let transaction_count = block.transactions.len() as u64;
128
129            debug!(target: "reth-bench", ?block_number, "Sending payload",);
130
131            // construct fcu to call
132            let forkchoice_state = ForkchoiceState {
133                head_block_hash: head,
134                safe_block_hash: safe,
135                finalized_block_hash: finalized,
136            };
137
138            let (version, params) = block_to_new_payload(block, is_optimism)?;
139            let start = Instant::now();
140            call_new_payload(&auth_provider, version, params).await?;
141
142            let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
143
144            call_forkchoice_updated(&auth_provider, version, forkchoice_state, None).await?;
145
146            // calculate the total duration and the fcu latency, record
147            let total_latency = start.elapsed();
148            let fcu_latency = total_latency - new_payload_result.latency;
149            let combined_result = CombinedResult {
150                block_number,
151                transaction_count,
152                new_payload_result,
153                fcu_latency,
154                total_latency,
155            };
156
157            // current duration since the start of the benchmark minus the time
158            // waiting for blocks
159            let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
160
161            // convert gas used to gigagas, then compute gigagas per second
162            info!(%combined_result);
163
164            // wait before sending the next payload
165            tokio::time::sleep(self.wait_time).await;
166
167            // record the current result
168            let gas_row =
169                TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
170            results.push((gas_row, combined_result));
171        }
172
173        // Check if the spawned task encountered an error
174        if let Ok(error) = error_receiver.try_recv() {
175            return Err(error);
176        }
177
178        let (gas_output_results, combined_results): (_, Vec<CombinedResult>) =
179            results.into_iter().unzip();
180
181        // write the csv output to files
182        if let Some(path) = self.benchmark.output {
183            // first write the combined results to a file
184            let output_path = path.join(COMBINED_OUTPUT_SUFFIX);
185            info!("Writing engine api call latency output to file: {:?}", output_path);
186            let mut writer = Writer::from_path(output_path)?;
187            for result in combined_results {
188                writer.serialize(result)?;
189            }
190            writer.flush()?;
191
192            // now write the gas output to a file
193            let output_path = path.join(GAS_OUTPUT_SUFFIX);
194            info!("Writing total gas output to file: {:?}", output_path);
195            let mut writer = Writer::from_path(output_path)?;
196            for row in &gas_output_results {
197                writer.serialize(row)?;
198            }
199            writer.flush()?;
200
201            info!("Finished writing benchmark output files to {:?}.", path);
202        }
203
204        // accumulate the results and calculate the overall Ggas/s
205        let gas_output = TotalGasOutput::new(gas_output_results)?;
206        info!(
207            total_duration=?gas_output.total_duration,
208            total_gas_used=?gas_output.total_gas_used,
209            blocks_processed=?gas_output.blocks_processed,
210            "Total Ggas/s: {:.4}",
211            gas_output.total_gigagas_per_second()
212        );
213
214        Ok(())
215    }
216}