reth_bench/bench/
new_payload_fcu.rs

1//! Runs the `reth bench` command, calling first newPayload for each block, then calling
2//! forkchoiceUpdated.
3//!
4//! Supports configurable waiting behavior:
5//! - **`--wait-time`**: Fixed sleep interval between blocks.
6//! - **`--wait-for-persistence`**: Waits for every Nth block to be persisted using the
7//!   `reth_subscribePersistedBlock` subscription, where N matches the engine's persistence
8//!   threshold. This ensures the benchmark doesn't outpace persistence.
9//!
10//! Both options can be used together or independently.
11
12use crate::{
13    bench::{
14        context::BenchContext,
15        output::{
16            write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
17        },
18    },
19    valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
20};
21use alloy_eips::BlockNumHash;
22use alloy_network::Ethereum;
23use alloy_provider::{Provider, RootProvider};
24use alloy_pubsub::SubscriptionStream;
25use alloy_rpc_client::RpcClient;
26use alloy_rpc_types_engine::ForkchoiceState;
27use alloy_transport_ws::WsConnect;
28use clap::Parser;
29use eyre::{Context, OptionExt};
30use futures::StreamExt;
31use humantime::parse_duration;
32use reth_cli_runner::CliContext;
33use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
34use reth_node_core::args::BenchmarkArgs;
35use std::time::{Duration, Instant};
36use tracing::{debug, info};
37use url::Url;
38
39const PERSISTENCE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(60);
40
41/// `reth benchmark new-payload-fcu` command
42#[derive(Debug, Parser)]
43pub struct Command {
44    /// The RPC url to use for getting data.
45    #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
46    rpc_url: String,
47
48    /// How long to wait after a forkchoice update before sending the next payload.
49    #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
50    wait_time: Option<Duration>,
51
52    /// Wait for blocks to be persisted before sending the next batch.
53    ///
54    /// When enabled, waits for every Nth block to be persisted using the
55    /// `reth_subscribePersistedBlock` subscription. This ensures the benchmark
56    /// doesn't outpace persistence.
57    ///
58    /// The subscription uses the regular RPC websocket endpoint (no JWT required).
59    #[arg(long, default_value = "false", verbatim_doc_comment)]
60    wait_for_persistence: bool,
61
62    /// Engine persistence threshold used for deciding when to wait for persistence.
63    ///
64    /// The benchmark waits after every `(threshold + 1)` blocks. By default this
65    /// matches the engine's `DEFAULT_PERSISTENCE_THRESHOLD` (2), so waits occur
66    /// at blocks 3, 6, 9, etc.
67    #[arg(
68        long = "persistence-threshold",
69        value_name = "PERSISTENCE_THRESHOLD",
70        default_value_t = DEFAULT_PERSISTENCE_THRESHOLD,
71        verbatim_doc_comment
72    )]
73    persistence_threshold: u64,
74
75    /// The size of the block buffer (channel capacity) for prefetching blocks from the RPC
76    /// endpoint.
77    #[arg(
78        long = "rpc-block-buffer-size",
79        value_name = "RPC_BLOCK_BUFFER_SIZE",
80        default_value = "20",
81        verbatim_doc_comment
82    )]
83    rpc_block_buffer_size: usize,
84
85    #[command(flatten)]
86    benchmark: BenchmarkArgs,
87}
88
89impl Command {
90    /// Execute `benchmark new-payload-fcu` command
91    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
92        // Log mode configuration
93        if let Some(duration) = self.wait_time {
94            info!("Using wait-time mode with {}ms delay between blocks", duration.as_millis());
95        }
96        if self.wait_for_persistence {
97            info!(
98                "Persistence waiting enabled (waits after every {} blocks to match engine gap > {} behavior)",
99                self.persistence_threshold + 1,
100                self.persistence_threshold
101            );
102        }
103
104        // Set up waiter based on configured options (duration takes precedence)
105        let mut waiter = match (self.wait_time, self.wait_for_persistence) {
106            (Some(duration), _) => Some(PersistenceWaiter::with_duration(duration)),
107            (None, true) => {
108                let sub = self.setup_persistence_subscription().await?;
109                Some(PersistenceWaiter::with_subscription(
110                    sub,
111                    self.persistence_threshold,
112                    PERSISTENCE_CHECKPOINT_TIMEOUT,
113                ))
114            }
115            (None, false) => None,
116        };
117
118        let BenchContext {
119            benchmark_mode,
120            block_provider,
121            auth_provider,
122            mut next_block,
123            is_optimism,
124            ..
125        } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
126
127        let buffer_size = self.rpc_block_buffer_size;
128
129        // Use a oneshot channel to propagate errors from the spawned task
130        let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel();
131        let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);
132
133        tokio::task::spawn(async move {
134            while benchmark_mode.contains(next_block) {
135                let block_res = block_provider
136                    .get_block_by_number(next_block.into())
137                    .full()
138                    .await
139                    .wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
140                let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
141                    Ok(block) => block,
142                    Err(e) => {
143                        tracing::error!("Failed to fetch block {next_block}: {e}");
144                        let _ = error_sender.send(e);
145                        break;
146                    }
147                };
148
149                let head_block_hash = block.header.hash;
150                let safe_block_hash = block_provider
151                    .get_block_by_number(block.header.number.saturating_sub(32).into());
152
153                let finalized_block_hash = block_provider
154                    .get_block_by_number(block.header.number.saturating_sub(64).into());
155
156                let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,);
157
158                let safe_block_hash = match safe {
159                    Ok(Some(block)) => block.header.hash,
160                    Ok(None) | Err(_) => head_block_hash,
161                };
162
163                let finalized_block_hash = match finalized {
164                    Ok(Some(block)) => block.header.hash,
165                    Ok(None) | Err(_) => head_block_hash,
166                };
167
168                next_block += 1;
169                if let Err(e) = sender
170                    .send((block, head_block_hash, safe_block_hash, finalized_block_hash))
171                    .await
172                {
173                    tracing::error!("Failed to send block data: {e}");
174                    break;
175                }
176            }
177        });
178
179        let mut results = Vec::new();
180        let total_benchmark_duration = Instant::now();
181        let mut total_wait_time = Duration::ZERO;
182
183        while let Some((block, head, safe, finalized)) = {
184            let wait_start = Instant::now();
185            let result = receiver.recv().await;
186            total_wait_time += wait_start.elapsed();
187            result
188        } {
189            let gas_used = block.header.gas_used;
190            let gas_limit = block.header.gas_limit;
191            let block_number = block.header.number;
192            let transaction_count = block.transactions.len() as u64;
193
194            debug!(target: "reth-bench", ?block_number, "Sending payload");
195
196            let forkchoice_state = ForkchoiceState {
197                head_block_hash: head,
198                safe_block_hash: safe,
199                finalized_block_hash: finalized,
200            };
201
202            let (version, params) = block_to_new_payload(block, is_optimism)?;
203            let start = Instant::now();
204            call_new_payload(&auth_provider, version, params).await?;
205
206            let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
207
208            call_forkchoice_updated(&auth_provider, version, forkchoice_state, None).await?;
209
210            let total_latency = start.elapsed();
211            let fcu_latency = total_latency - new_payload_result.latency;
212            let combined_result = CombinedResult {
213                block_number,
214                gas_limit,
215                transaction_count,
216                new_payload_result,
217                fcu_latency,
218                total_latency,
219            };
220
221            // Exclude time spent waiting on the block prefetch channel from the benchmark duration.
222            // We want to measure engine throughput, not RPC fetch latency.
223            let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
224            info!(%combined_result);
225
226            if let Some(w) = &mut waiter {
227                w.on_block(block_number).await?;
228            }
229
230            let gas_row =
231                TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
232            results.push((gas_row, combined_result));
233        }
234
235        // Check if the spawned task encountered an error
236        if let Ok(error) = error_receiver.try_recv() {
237            return Err(error);
238        }
239
240        // Drop waiter - we don't need to wait for final blocks to persist
241        // since the benchmark goal is measuring Ggas/s of newPayload/FCU, not persistence.
242        drop(waiter);
243
244        let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
245            results.into_iter().unzip();
246
247        if let Some(ref path) = self.benchmark.output {
248            write_benchmark_results(path, &gas_output_results, combined_results)?;
249        }
250
251        let gas_output = TotalGasOutput::new(gas_output_results)?;
252
253        info!(
254            total_duration=?gas_output.total_duration,
255            total_gas_used=?gas_output.total_gas_used,
256            blocks_processed=?gas_output.blocks_processed,
257            "Total Ggas/s: {:.4}",
258            gas_output.total_gigagas_per_second()
259        );
260
261        Ok(())
262    }
263
264    /// Returns the websocket RPC URL used for the persistence subscription.
265    ///
266    /// Preference:
267    /// - If `--ws-rpc-url` is provided, use it directly.
268    /// - Otherwise, derive a WS RPC URL from `--engine-rpc-url`.
269    ///
270    /// The persistence subscription endpoint (`reth_subscribePersistedBlock`) is exposed on
271    /// the regular RPC server (WS port, usually 8546), not on the engine API port (usually 8551).
272    /// Since `BenchmarkArgs` only has the engine URL by default, we convert the scheme
273    /// (http→ws, https→wss) and force the port to 8546.
274    fn derive_ws_rpc_url(&self) -> eyre::Result<Url> {
275        if let Some(ref ws_url) = self.benchmark.ws_rpc_url {
276            let parsed: Url = ws_url
277                .parse()
278                .wrap_err_with(|| format!("Failed to parse WebSocket RPC URL: {ws_url}"))?;
279            info!(target: "reth-bench", ws_url = %parsed, "Using provided WebSocket RPC URL");
280            Ok(parsed)
281        } else {
282            let derived = engine_url_to_ws_url(&self.benchmark.engine_rpc_url)?;
283            debug!(
284                target: "reth-bench",
285                engine_url = %self.benchmark.engine_rpc_url,
286                %derived,
287                "Derived WebSocket RPC URL from engine RPC URL"
288            );
289            Ok(derived)
290        }
291    }
292
293    /// Establishes a websocket connection and subscribes to `reth_subscribePersistedBlock`.
294    async fn setup_persistence_subscription(&self) -> eyre::Result<PersistenceSubscription> {
295        let ws_url = self.derive_ws_rpc_url()?;
296
297        info!("Connecting to WebSocket at {} for persistence subscription", ws_url);
298
299        let ws_connect = WsConnect::new(ws_url.to_string());
300        let client = RpcClient::connect_pubsub(ws_connect)
301            .await
302            .wrap_err("Failed to connect to WebSocket RPC endpoint")?;
303        let provider: RootProvider<Ethereum> = RootProvider::new(client);
304
305        let subscription = provider
306            .subscribe_to::<BlockNumHash>("reth_subscribePersistedBlock")
307            .await
308            .wrap_err("Failed to subscribe to persistence notifications")?;
309
310        info!("Subscribed to persistence notifications");
311
312        Ok(PersistenceSubscription::new(provider, subscription.into_stream()))
313    }
314}
315
316/// Converts an engine API URL to the default RPC websocket URL.
317///
318/// Transformations:
319/// - `http`  → `ws`
320/// - `https` → `wss`
321/// - `ws` / `wss` keep their scheme
322/// - Port is always set to `8546`, reth's default RPC websocket port.
323///
324/// This is used when we only know the engine API URL (typically `:8551`) but
325/// need to connect to the node's WS RPC endpoint for persistence events.
326fn engine_url_to_ws_url(engine_url: &str) -> eyre::Result<Url> {
327    let url: Url = engine_url
328        .parse()
329        .wrap_err_with(|| format!("Failed to parse engine RPC URL: {engine_url}"))?;
330
331    let mut ws_url = url.clone();
332
333    match ws_url.scheme() {
334        "http" => ws_url
335            .set_scheme("ws")
336            .map_err(|_| eyre::eyre!("Failed to set WS scheme for URL: {url}"))?,
337        "https" => ws_url
338            .set_scheme("wss")
339            .map_err(|_| eyre::eyre!("Failed to set WSS scheme for URL: {url}"))?,
340        "ws" | "wss" => {}
341        scheme => {
342            return Err(eyre::eyre!(
343            "Unsupported URL scheme '{scheme}' for URL: {url}. Expected http, https, ws, or wss."
344        ))
345        }
346    }
347
348    ws_url.set_port(Some(8546)).map_err(|_| eyre::eyre!("Failed to set port for URL: {url}"))?;
349
350    Ok(ws_url)
351}
352
353/// Waits until the persistence subscription reports that `target` has been persisted.
354///
355/// Consumes subscription events until `last_persisted >= target`, or returns an error if:
356/// - the subscription stream ends unexpectedly, or
357/// - `timeout` elapses before `target` is observed.
358async fn wait_for_persistence(
359    stream: &mut SubscriptionStream<BlockNumHash>,
360    target: u64,
361    last_persisted: &mut u64,
362    timeout: Duration,
363) -> eyre::Result<()> {
364    tokio::time::timeout(timeout, async {
365        while *last_persisted < target {
366            match stream.next().await {
367                Some(persisted) => {
368                    *last_persisted = persisted.number;
369                    debug!(
370                        target: "reth-bench",
371                        persisted_block = ?last_persisted,
372                        "Received persistence notification"
373                    );
374                }
375                None => {
376                    return Err(eyre::eyre!("Persistence subscription closed unexpectedly"));
377                }
378            }
379        }
380        Ok(())
381    })
382    .await
383    .map_err(|_| {
384        eyre::eyre!(
385            "Persistence timeout: target block {} not persisted within {:?}. Last persisted: {}",
386            target,
387            timeout,
388            last_persisted
389        )
390    })?
391}
392
393/// Wrapper that keeps both the subscription stream and the underlying provider alive.
394/// The provider must be kept alive for the subscription to continue receiving events.
395struct PersistenceSubscription {
396    _provider: RootProvider<Ethereum>,
397    stream: SubscriptionStream<BlockNumHash>,
398}
399
400impl PersistenceSubscription {
401    const fn new(
402        provider: RootProvider<Ethereum>,
403        stream: SubscriptionStream<BlockNumHash>,
404    ) -> Self {
405        Self { _provider: provider, stream }
406    }
407
408    const fn stream_mut(&mut self) -> &mut SubscriptionStream<BlockNumHash> {
409        &mut self.stream
410    }
411}
412
413/// Encapsulates the block waiting logic.
414///
415/// Provides a simple `on_block()` interface that handles both:
416/// - Fixed duration waits (when `wait_time` is set)
417/// - Persistence-based waits (when `subscription` is set)
418///
419/// For persistence mode, waits after every `(threshold + 1)` blocks.
420struct PersistenceWaiter {
421    wait_time: Option<Duration>,
422    subscription: Option<PersistenceSubscription>,
423    blocks_sent: u64,
424    last_persisted: u64,
425    threshold: u64,
426    timeout: Duration,
427}
428
429impl PersistenceWaiter {
430    const fn with_duration(wait_time: Duration) -> Self {
431        Self {
432            wait_time: Some(wait_time),
433            subscription: None,
434            blocks_sent: 0,
435            last_persisted: 0,
436            threshold: 0,
437            timeout: Duration::ZERO,
438        }
439    }
440
441    const fn with_subscription(
442        subscription: PersistenceSubscription,
443        threshold: u64,
444        timeout: Duration,
445    ) -> Self {
446        Self {
447            wait_time: None,
448            subscription: Some(subscription),
449            blocks_sent: 0,
450            last_persisted: 0,
451            threshold,
452            timeout,
453        }
454    }
455
456    /// Called once per block. Waits based on the configured mode.
457    #[allow(clippy::manual_is_multiple_of)]
458    async fn on_block(&mut self, block_number: u64) -> eyre::Result<()> {
459        if let Some(wait_time) = self.wait_time {
460            tokio::time::sleep(wait_time).await;
461            return Ok(());
462        }
463
464        let Some(ref mut subscription) = self.subscription else {
465            return Ok(());
466        };
467
468        self.blocks_sent += 1;
469
470        if self.blocks_sent % (self.threshold + 1) == 0 {
471            debug!(
472                target: "reth-bench",
473                target_block = ?block_number,
474                last_persisted = self.last_persisted,
475                blocks_sent = self.blocks_sent,
476                "Waiting for persistence"
477            );
478
479            wait_for_persistence(
480                subscription.stream_mut(),
481                block_number,
482                &mut self.last_persisted,
483                self.timeout,
484            )
485            .await?;
486
487            debug!(
488                target: "reth-bench",
489                persisted = self.last_persisted,
490                "Persistence caught up"
491            );
492        }
493
494        Ok(())
495    }
496}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501
502    #[test]
503    fn test_engine_url_to_ws_url() {
504        // http -> ws, always uses port 8546
505        let result = engine_url_to_ws_url("http://localhost:8551").unwrap();
506        assert_eq!(result.as_str(), "ws://localhost:8546/");
507
508        // https -> wss
509        let result = engine_url_to_ws_url("https://localhost:8551").unwrap();
510        assert_eq!(result.as_str(), "wss://localhost:8546/");
511
512        // Custom engine port still maps to 8546
513        let result = engine_url_to_ws_url("http://localhost:9551").unwrap();
514        assert_eq!(result.port(), Some(8546));
515
516        // Already ws passthrough
517        let result = engine_url_to_ws_url("ws://localhost:8546").unwrap();
518        assert_eq!(result.scheme(), "ws");
519
520        // Invalid inputs
521        assert!(engine_url_to_ws_url("ftp://localhost:8551").is_err());
522        assert!(engine_url_to_ws_url("not a valid url").is_err());
523    }
524
525    #[tokio::test]
526    async fn test_waiter_with_duration() {
527        let mut waiter = PersistenceWaiter::with_duration(Duration::from_millis(1));
528
529        let start = Instant::now();
530        waiter.on_block(1).await.unwrap();
531        waiter.on_block(2).await.unwrap();
532        waiter.on_block(3).await.unwrap();
533
534        // Should have waited ~3ms total
535        assert!(start.elapsed() >= Duration::from_millis(3));
536    }
537}