reth_bench/bench/
new_payload_fcu.rs

1//! Runs the `reth bench` command, calling first newPayload for each block, then calling
2//! forkchoiceUpdated.
3
4use crate::{
5    bench::{
6        context::BenchContext,
7        output::{
8            CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, COMBINED_OUTPUT_SUFFIX,
9            GAS_OUTPUT_SUFFIX,
10        },
11    },
12    valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
13};
14use alloy_provider::Provider;
15use alloy_rpc_types_engine::ForkchoiceState;
16use clap::Parser;
17use csv::Writer;
18use eyre::Context;
19use humantime::parse_duration;
20use reth_cli_runner::CliContext;
21use reth_node_core::args::BenchmarkArgs;
22use std::time::{Duration, Instant};
23use tracing::{debug, info};
24
25/// `reth benchmark new-payload-fcu` command
26#[derive(Debug, Parser)]
27pub struct Command {
28    /// The RPC url to use for getting data.
29    #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
30    rpc_url: String,
31
32    /// How long to wait after a forkchoice update before sending the next payload.
33    #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
34    wait_time: Option<Duration>,
35
36    #[command(flatten)]
37    benchmark: BenchmarkArgs,
38}
39
40impl Command {
41    /// Execute `benchmark new-payload-fcu` command
42    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
43        let BenchContext {
44            benchmark_mode,
45            block_provider,
46            auth_provider,
47            mut next_block,
48            is_optimism,
49        } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
50
51        let (sender, mut receiver) = tokio::sync::mpsc::channel(1000);
52        tokio::task::spawn(async move {
53            while benchmark_mode.contains(next_block) {
54                let block_res = block_provider
55                    .get_block_by_number(next_block.into())
56                    .full()
57                    .await
58                    .wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
59                let block = block_res.unwrap().unwrap();
60                let header = block.header.clone();
61
62                let (version, params) = block_to_new_payload(block, is_optimism).unwrap();
63                let head_block_hash = header.hash;
64                let safe_block_hash =
65                    block_provider.get_block_by_number(header.number.saturating_sub(32).into());
66
67                let finalized_block_hash =
68                    block_provider.get_block_by_number(header.number.saturating_sub(64).into());
69
70                let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,);
71
72                let safe_block_hash = safe.unwrap().expect("finalized block exists").header.hash;
73                let finalized_block_hash =
74                    finalized.unwrap().expect("finalized block exists").header.hash;
75
76                next_block += 1;
77                sender
78                    .send((
79                        header,
80                        version,
81                        params,
82                        head_block_hash,
83                        safe_block_hash,
84                        finalized_block_hash,
85                    ))
86                    .await
87                    .unwrap();
88            }
89        });
90
91        // put results in a summary vec so they can be printed at the end
92        let mut results = Vec::new();
93        let total_benchmark_duration = Instant::now();
94        let mut total_wait_time = Duration::ZERO;
95
96        while let Some((header, version, params, head, safe, finalized)) = {
97            let wait_start = Instant::now();
98            let result = receiver.recv().await;
99            total_wait_time += wait_start.elapsed();
100            result
101        } {
102            // just put gas used here
103            let gas_used = header.gas_used;
104            let block_number = header.number;
105
106            debug!(target: "reth-bench", ?block_number, "Sending payload",);
107
108            // construct fcu to call
109            let forkchoice_state = ForkchoiceState {
110                head_block_hash: head,
111                safe_block_hash: safe,
112                finalized_block_hash: finalized,
113            };
114
115            let start = Instant::now();
116            call_new_payload(&auth_provider, version, params).await?;
117
118            let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
119
120            call_forkchoice_updated(&auth_provider, version, forkchoice_state, None).await?;
121
122            // calculate the total duration and the fcu latency, record
123            let total_latency = start.elapsed();
124            let fcu_latency = total_latency - new_payload_result.latency;
125            let combined_result =
126                CombinedResult { block_number, new_payload_result, fcu_latency, total_latency };
127
128            // current duration since the start of the benchmark minus the time
129            // waiting for blocks
130            let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
131
132            // convert gas used to gigagas, then compute gigagas per second
133            info!(%combined_result);
134
135            // wait if we need to
136            if let Some(wait_time) = self.wait_time {
137                tokio::time::sleep(wait_time).await;
138            }
139
140            // record the current result
141            let gas_row = TotalGasRow { block_number, gas_used, time: current_duration };
142            results.push((gas_row, combined_result));
143        }
144
145        let (gas_output_results, combined_results): (_, Vec<CombinedResult>) =
146            results.into_iter().unzip();
147
148        // write the csv output to files
149        if let Some(path) = self.benchmark.output {
150            // first write the combined results to a file
151            let output_path = path.join(COMBINED_OUTPUT_SUFFIX);
152            info!("Writing engine api call latency output to file: {:?}", output_path);
153            let mut writer = Writer::from_path(output_path)?;
154            for result in combined_results {
155                writer.serialize(result)?;
156            }
157            writer.flush()?;
158
159            // now write the gas output to a file
160            let output_path = path.join(GAS_OUTPUT_SUFFIX);
161            info!("Writing total gas output to file: {:?}", output_path);
162            let mut writer = Writer::from_path(output_path)?;
163            for row in &gas_output_results {
164                writer.serialize(row)?;
165            }
166            writer.flush()?;
167
168            info!("Finished writing benchmark output files to {:?}.", path);
169        }
170
171        // accumulate the results and calculate the overall Ggas/s
172        let gas_output = TotalGasOutput::new(gas_output_results);
173        info!(
174            total_duration=?gas_output.total_duration,
175            total_gas_used=?gas_output.total_gas_used,
176            blocks_processed=?gas_output.blocks_processed,
177            "Total Ggas/s: {:.4}",
178            gas_output.total_gigagas_per_second()
179        );
180
181        Ok(())
182    }
183}