reth_bench/bench/
new_payload_fcu.rs
1use crate::{
5 bench::{
6 context::BenchContext,
7 output::{
8 CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, COMBINED_OUTPUT_SUFFIX,
9 GAS_OUTPUT_SUFFIX,
10 },
11 },
12 valid_payload::{call_forkchoice_updated, call_new_payload},
13};
14use alloy_provider::Provider;
15use alloy_rpc_types_engine::{ExecutionPayload, ForkchoiceState};
16use clap::Parser;
17use csv::Writer;
18use reth_cli_runner::CliContext;
19use reth_node_core::args::BenchmarkArgs;
20use std::time::{Duration, Instant};
21use tracing::{debug, info};
22
23#[derive(Debug, Parser)]
25pub struct Command {
26 #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
28 rpc_url: String,
29
30 #[command(flatten)]
31 benchmark: BenchmarkArgs,
32}
33
34impl Command {
35 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
37 let BenchContext { benchmark_mode, block_provider, auth_provider, mut next_block } =
38 BenchContext::new(&self.benchmark, self.rpc_url).await?;
39
40 let (sender, mut receiver) = tokio::sync::mpsc::channel(1000);
41 tokio::task::spawn(async move {
42 while benchmark_mode.contains(next_block) {
43 let block_res = block_provider.get_block_by_number(next_block.into()).full().await;
44 let block = block_res.unwrap().unwrap();
45
46 let block = block
47 .into_inner()
48 .map_header(|header| header.map(|h| h.into_header_with_defaults()))
49 .try_map_transactions(|tx| {
50 tx.try_into_either::<op_alloy_consensus::OpTxEnvelope>()
52 })
53 .unwrap()
54 .into_consensus();
55
56 let blob_versioned_hashes =
57 block.body.blob_versioned_hashes_iter().copied().collect::<Vec<_>>();
58
59 let payload = ExecutionPayload::from_block_slow(&block).0;
61 let header = block.header;
62 let head_block_hash = payload.block_hash();
63 let safe_block_hash =
64 block_provider.get_block_by_number(header.number.saturating_sub(32).into());
65
66 let finalized_block_hash =
67 block_provider.get_block_by_number(header.number.saturating_sub(64).into());
68
69 let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,);
70
71 let safe_block_hash = safe.unwrap().expect("finalized block exists").header.hash;
72 let finalized_block_hash =
73 finalized.unwrap().expect("finalized block exists").header.hash;
74
75 next_block += 1;
76 sender
77 .send((
78 header,
79 blob_versioned_hashes,
80 payload,
81 head_block_hash,
82 safe_block_hash,
83 finalized_block_hash,
84 ))
85 .await
86 .unwrap();
87 }
88 });
89
90 let mut results = Vec::new();
92 let total_benchmark_duration = Instant::now();
93 let mut total_wait_time = Duration::ZERO;
94
95 while let Some((header, versioned_hashes, payload, head, safe, finalized)) = {
96 let wait_start = Instant::now();
97 let result = receiver.recv().await;
98 total_wait_time += wait_start.elapsed();
99 result
100 } {
101 let gas_used = header.gas_used;
103 let block_number = header.number;
104
105 debug!(target: "reth-bench", ?block_number, "Sending payload",);
106
107 let forkchoice_state = ForkchoiceState {
109 head_block_hash: head,
110 safe_block_hash: safe,
111 finalized_block_hash: finalized,
112 };
113
114 let start = Instant::now();
115 let message_version = call_new_payload(
116 &auth_provider,
117 payload,
118 header.parent_beacon_block_root,
119 versioned_hashes,
120 )
121 .await?;
122
123 let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
124
125 call_forkchoice_updated(&auth_provider, message_version, forkchoice_state, None)
126 .await?;
127
128 let total_latency = start.elapsed();
130 let fcu_latency = total_latency - new_payload_result.latency;
131 let combined_result =
132 CombinedResult { block_number, new_payload_result, fcu_latency, total_latency };
133
134 let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
137
138 info!(%combined_result);
140
141 let gas_row = TotalGasRow { block_number, gas_used, time: current_duration };
143 results.push((gas_row, combined_result));
144 }
145
146 let (gas_output_results, combined_results): (_, Vec<CombinedResult>) =
147 results.into_iter().unzip();
148
149 if let Some(path) = self.benchmark.output {
151 let output_path = path.join(COMBINED_OUTPUT_SUFFIX);
153 info!("Writing engine api call latency output to file: {:?}", output_path);
154 let mut writer = Writer::from_path(output_path)?;
155 for result in combined_results {
156 writer.serialize(result)?;
157 }
158 writer.flush()?;
159
160 let output_path = path.join(GAS_OUTPUT_SUFFIX);
162 info!("Writing total gas output to file: {:?}", output_path);
163 let mut writer = Writer::from_path(output_path)?;
164 for row in &gas_output_results {
165 writer.serialize(row)?;
166 }
167 writer.flush()?;
168
169 info!("Finished writing benchmark output files to {:?}.", path);
170 }
171
172 let gas_output = TotalGasOutput::new(gas_output_results);
174 info!(
175 total_duration=?gas_output.total_duration,
176 total_gas_used=?gas_output.total_gas_used,
177 blocks_processed=?gas_output.blocks_processed,
178 "Total Ggas/s: {:.4}",
179 gas_output.total_gigagas_per_second()
180 );
181
182 Ok(())
183 }
184}