Skip to main content

reth_bench/bench/
new_payload_fcu.rs

1//! Runs the `reth bench` command, calling first newPayload for each block, then calling
2//! forkchoiceUpdated.
3//!
4//! Supports configurable waiting behavior:
5//! - **`--wait-time`**: Fixed sleep interval between blocks.
6//! - **`--wait-for-persistence`**: Waits for every Nth block to be persisted using the
7//!   `reth_subscribePersistedBlock` subscription, where N matches the engine's persistence
8//!   threshold. This ensures the benchmark doesn't outpace persistence.
9//!
10//! Both options can be used together or independently.
11
12use crate::{
13    bench::{
14        context::BenchContext,
15        helpers::parse_duration,
16        output::{
17            write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
18        },
19        persistence_waiter::{
20            derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
21        },
22    },
23    valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
24};
25use alloy_provider::Provider;
26use alloy_rpc_types_engine::ForkchoiceState;
27use clap::Parser;
28use eyre::{Context, OptionExt};
29use reth_cli_runner::CliContext;
30use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
31use reth_node_core::args::BenchmarkArgs;
32use std::time::{Duration, Instant};
33use tracing::{debug, info};
34
35/// `reth benchmark new-payload-fcu` command
36#[derive(Debug, Parser)]
37pub struct Command {
38    /// The RPC url to use for getting data.
39    #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
40    rpc_url: String,
41
42    /// How long to wait after a forkchoice update before sending the next payload.
43    ///
44    /// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
45    /// milliseconds (e.g. `400`).
46    #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
47    wait_time: Option<Duration>,
48
49    /// Wait for blocks to be persisted before sending the next batch.
50    ///
51    /// When enabled, waits for every Nth block to be persisted using the
52    /// `reth_subscribePersistedBlock` subscription. This ensures the benchmark
53    /// doesn't outpace persistence.
54    ///
55    /// The subscription uses the regular RPC websocket endpoint (no JWT required).
56    #[arg(long, default_value = "false", verbatim_doc_comment)]
57    wait_for_persistence: bool,
58
59    /// Engine persistence threshold used for deciding when to wait for persistence.
60    ///
61    /// The benchmark waits after every `(threshold + 1)` blocks. By default this
62    /// matches the engine's `DEFAULT_PERSISTENCE_THRESHOLD` (2), so waits occur
63    /// at blocks 3, 6, 9, etc.
64    #[arg(
65        long = "persistence-threshold",
66        value_name = "PERSISTENCE_THRESHOLD",
67        default_value_t = DEFAULT_PERSISTENCE_THRESHOLD,
68        verbatim_doc_comment
69    )]
70    persistence_threshold: u64,
71
72    /// Timeout for waiting on persistence at each checkpoint.
73    ///
74    /// Must be long enough to account for the persistence thread being blocked
75    /// by pruning after the previous save.
76    #[arg(
77        long = "persistence-timeout",
78        value_name = "PERSISTENCE_TIMEOUT",
79        value_parser = parse_duration,
80        default_value = "120s",
81        verbatim_doc_comment
82    )]
83    persistence_timeout: Duration,
84
85    /// The size of the block buffer (channel capacity) for prefetching blocks from the RPC
86    /// endpoint.
87    #[arg(
88        long = "rpc-block-buffer-size",
89        value_name = "RPC_BLOCK_BUFFER_SIZE",
90        default_value = "20",
91        verbatim_doc_comment
92    )]
93    rpc_block_buffer_size: usize,
94
95    #[command(flatten)]
96    benchmark: BenchmarkArgs,
97}
98
99impl Command {
100    /// Execute `benchmark new-payload-fcu` command
101    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
102        // Log mode configuration
103        if let Some(duration) = self.wait_time {
104            info!(target: "reth-bench", "Using wait-time mode with {}ms delay between blocks", duration.as_millis());
105        }
106        if self.wait_for_persistence {
107            info!(
108                target: "reth-bench",
109                "Persistence waiting enabled (waits after every {} blocks to match engine gap > {} behavior)",
110                self.persistence_threshold + 1,
111                self.persistence_threshold
112            );
113        }
114
115        // Set up waiter based on configured options
116        // When both are set: wait at least wait_time, and also wait for persistence if needed
117        let mut waiter = match (self.wait_time, self.wait_for_persistence) {
118            (Some(duration), true) => {
119                let ws_url = derive_ws_rpc_url(
120                    self.benchmark.ws_rpc_url.as_deref(),
121                    &self.benchmark.engine_rpc_url,
122                )?;
123                let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
124                Some(PersistenceWaiter::with_duration_and_subscription(
125                    duration,
126                    sub,
127                    self.persistence_threshold,
128                    self.persistence_timeout,
129                ))
130            }
131            (Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
132            (None, true) => {
133                let ws_url = derive_ws_rpc_url(
134                    self.benchmark.ws_rpc_url.as_deref(),
135                    &self.benchmark.engine_rpc_url,
136                )?;
137                let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
138                Some(PersistenceWaiter::with_subscription(
139                    sub,
140                    self.persistence_threshold,
141                    self.persistence_timeout,
142                ))
143            }
144            (None, false) => None,
145        };
146
147        let BenchContext {
148            benchmark_mode,
149            block_provider,
150            auth_provider,
151            mut next_block,
152            is_optimism,
153            ..
154        } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
155
156        let total_blocks = benchmark_mode.total_blocks();
157        let buffer_size = self.rpc_block_buffer_size;
158
159        // Use a oneshot channel to propagate errors from the spawned task
160        let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel();
161        let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);
162
163        tokio::task::spawn(async move {
164            while benchmark_mode.contains(next_block) {
165                let block_res = block_provider
166                    .get_block_by_number(next_block.into())
167                    .full()
168                    .await
169                    .wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
170                let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
171                    Ok(block) => block,
172                    Err(e) => {
173                        tracing::error!(target: "reth-bench", "Failed to fetch block {next_block}: {e}");
174                        let _ = error_sender.send(e);
175                        break;
176                    }
177                };
178
179                let head_block_hash = block.header.hash;
180                let safe_block_hash = block_provider
181                    .get_block_by_number(block.header.number.saturating_sub(32).into());
182
183                let finalized_block_hash = block_provider
184                    .get_block_by_number(block.header.number.saturating_sub(64).into());
185
186                let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,);
187
188                let safe_block_hash = match safe {
189                    Ok(Some(block)) => block.header.hash,
190                    Ok(None) | Err(_) => head_block_hash,
191                };
192
193                let finalized_block_hash = match finalized {
194                    Ok(Some(block)) => block.header.hash,
195                    Ok(None) | Err(_) => head_block_hash,
196                };
197
198                next_block += 1;
199                if let Err(e) = sender
200                    .send((block, head_block_hash, safe_block_hash, finalized_block_hash))
201                    .await
202                {
203                    tracing::error!(target: "reth-bench", "Failed to send block data: {e}");
204                    break;
205                }
206            }
207        });
208
209        let mut results = Vec::new();
210        let mut blocks_processed = 0u64;
211        let total_benchmark_duration = Instant::now();
212        let mut total_wait_time = Duration::ZERO;
213
214        while let Some((block, head, safe, finalized)) = {
215            let wait_start = Instant::now();
216            let result = receiver.recv().await;
217            total_wait_time += wait_start.elapsed();
218            result
219        } {
220            let gas_used = block.header.gas_used;
221            let gas_limit = block.header.gas_limit;
222            let block_number = block.header.number;
223            let transaction_count = block.transactions.len() as u64;
224
225            debug!(target: "reth-bench", ?block_number, "Sending payload");
226
227            let forkchoice_state = ForkchoiceState {
228                head_block_hash: head,
229                safe_block_hash: safe,
230                finalized_block_hash: finalized,
231            };
232
233            let (version, params) = block_to_new_payload(block, is_optimism)?;
234            let start = Instant::now();
235            call_new_payload(&auth_provider, version, params).await?;
236
237            let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
238
239            call_forkchoice_updated(&auth_provider, version, forkchoice_state, None).await?;
240
241            let total_latency = start.elapsed();
242            let fcu_latency = total_latency - new_payload_result.latency;
243            let combined_result = CombinedResult {
244                block_number,
245                gas_limit,
246                transaction_count,
247                new_payload_result,
248                fcu_latency,
249                total_latency,
250            };
251
252            // Exclude time spent waiting on the block prefetch channel from the benchmark duration.
253            // We want to measure engine throughput, not RPC fetch latency.
254            blocks_processed += 1;
255            let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
256            let progress = match total_blocks {
257                Some(total) => format!("{blocks_processed}/{total}"),
258                None => format!("{blocks_processed}"),
259            };
260            info!(target: "reth-bench", progress, %combined_result);
261
262            if let Some(w) = &mut waiter {
263                w.on_block(block_number).await?;
264            }
265
266            let gas_row =
267                TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
268            results.push((gas_row, combined_result));
269        }
270
271        // Check if the spawned task encountered an error
272        if let Ok(error) = error_receiver.try_recv() {
273            return Err(error);
274        }
275
276        // Drop waiter - we don't need to wait for final blocks to persist
277        // since the benchmark goal is measuring Ggas/s of newPayload/FCU, not persistence.
278        drop(waiter);
279
280        let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
281            results.into_iter().unzip();
282
283        if let Some(ref path) = self.benchmark.output {
284            write_benchmark_results(path, &gas_output_results, &combined_results)?;
285        }
286
287        let gas_output =
288            TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
289
290        info!(
291            target: "reth-bench",
292            total_gas_used = gas_output.total_gas_used,
293            total_duration = ?gas_output.total_duration,
294            execution_duration = ?gas_output.execution_duration,
295            blocks_processed = gas_output.blocks_processed,
296            wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
297            execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
298            "Benchmark complete"
299        );
300
301        Ok(())
302    }
303}