Skip to main content

reth_bench/bench/
new_payload_fcu.rs

1//! Runs the `reth bench` command, calling first newPayload for each block, then calling
2//! forkchoiceUpdated.
3
4use crate::{
5    bench::{
6        context::BenchContext,
7        generate_big_block::big_blocks_stream,
8        helpers::{fetch_block_access_list, parse_duration},
9        metrics_scraper::MetricsScraper,
10        output::{
11            write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
12        },
13    },
14    valid_payload::{
15        block_to_new_payload, call_forkchoice_updated_with_reth, call_new_payload_with_reth,
16    },
17};
18use alloy_consensus::TxEnvelope;
19use alloy_eip7928::BlockAccessList;
20use alloy_eips::Encodable2718;
21use alloy_primitives::{Bytes, B256};
22use alloy_provider::{
23    ext::DebugApi,
24    network::{AnyNetwork, AnyRpcBlock},
25    Provider, RootProvider,
26};
27use alloy_rpc_types_engine::{
28    ExecutionData, ExecutionPayloadEnvelopeV5, ForkchoiceState, PayloadAttributes,
29};
30use clap::Parser;
31use eyre::{bail, ensure, Context, OptionExt};
32use futures::{stream, Stream, StreamExt, TryStreamExt};
33use reth_cli_runner::CliContext;
34use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
35use reth_node_api::{BigBlockData, EngineApiMessageVersion, ExecutionPayload};
36use reth_node_core::args::{BenchmarkArgs, WaitForPersistence};
37use reth_rpc_api::{RethNewPayloadInput, TestingBuildBlockRequestV1};
38use std::{
39    pin::Pin,
40    time::{Duration, Instant},
41};
42use tokio::{runtime::Handle, sync::mpsc};
43use tokio_stream::wrappers::ReceiverStream;
44use tracing::{debug, info, warn};
45
46/// `reth benchmark new-payload-fcu` command
47#[derive(Debug, Parser)]
48pub struct Command {
49    /// The RPC url to use for getting data.
50    #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
51    rpc_url: String,
52
53    /// Build a separate fork with `testing_buildBlockV1` and alternate forkchoice updates between
54    /// the canonical chain and that fork on every block while the fork grows up to the configured
55    /// depth.
56    ///
57    /// This requires enabling the hidden `testing` RPC module on the target node,
58    /// for example with `reth node --http --http.api eth,testing`.
59    #[arg(
60        long,
61        value_name = "DEPTH",
62        num_args = 0..=1,
63        default_missing_value = "8",
64        value_parser = parse_reorg_depth,
65        verbatim_doc_comment
66    )]
67    reorg: Option<usize>,
68
69    /// How long to wait after a forkchoice update before sending the next payload.
70    ///
71    /// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
72    /// milliseconds (e.g. `400`).
73    #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
74    wait_time: Option<Duration>,
75
76    /// Engine persistence threshold used for deciding when to wait for persistence.
77    ///
78    /// The benchmark waits after every `(threshold + 1)` blocks. By default this
79    /// matches the engine's `DEFAULT_PERSISTENCE_THRESHOLD` (2), so waits occur
80    /// at blocks 3, 6, 9, etc.
81    #[arg(
82        long = "persistence-threshold",
83        value_name = "PERSISTENCE_THRESHOLD",
84        default_value_t = DEFAULT_PERSISTENCE_THRESHOLD,
85        verbatim_doc_comment
86    )]
87    persistence_threshold: u64,
88
89    /// Timeout for waiting on persistence at each checkpoint.
90    ///
91    /// Must be long enough to account for the persistence thread being blocked
92    /// by pruning after the previous save.
93    #[arg(
94        long = "persistence-timeout",
95        value_name = "PERSISTENCE_TIMEOUT",
96        value_parser = parse_duration,
97        default_value = "120s",
98        verbatim_doc_comment
99    )]
100    persistence_timeout: Duration,
101
102    /// The size of the block buffer (channel capacity) for prefetching blocks from the RPC
103    /// endpoint.
104    #[arg(
105        long = "rpc-block-buffer-size",
106        value_name = "RPC_BLOCK_BUFFER_SIZE",
107        default_value = "20",
108        verbatim_doc_comment
109    )]
110    rpc_block_buffer_size: usize,
111
112    /// Weather to enable bal by default or not.
113    #[arg(long, default_value = "false", verbatim_doc_comment)]
114    enable_bal: bool,
115
116    /// The target gas for the big blocks.
117    #[arg(long, value_name = "TARGET_GAS", default_value = "1G", value_parser = super::helpers::parse_gas_limit)]
118    pub big_blocks_target_gas: u64,
119
120    #[command(flatten)]
121    benchmark: BenchmarkArgs,
122}
123
124#[derive(Debug)]
125struct PreparedBuiltBlock {
126    block_hash: B256,
127    params: serde_json::Value,
128}
129
130#[derive(Debug)]
131struct QueuedForkBlock {
132    block_number: u64,
133    prepared: PreparedBuiltBlock,
134}
135
136#[derive(Debug)]
137struct ReorgState {
138    depth: usize,
139    fork_length: usize,
140    branch_point_hash: Option<B256>,
141    fork_parent_hash: Option<B256>,
142}
143
144impl ReorgState {
145    const fn new(depth: usize) -> Self {
146        Self { depth, fork_length: 0, branch_point_hash: None, fork_parent_hash: None }
147    }
148
149    const fn push_fork_head(&mut self, canonical_parent_hash: B256, fork_head_hash: B256) {
150        if self.fork_length == 0 {
151            self.branch_point_hash = Some(canonical_parent_hash);
152        }
153        self.fork_length += 1;
154        self.fork_parent_hash = Some(fork_head_hash);
155    }
156
157    fn forkchoice_state(&self, fork_head_hash: B256) -> eyre::Result<ForkchoiceState> {
158        let branch_point_hash = self.branch_point_hash.ok_or_eyre("missing reorg branch point")?;
159
160        Ok(ForkchoiceState {
161            head_block_hash: fork_head_hash,
162            safe_block_hash: branch_point_hash,
163            finalized_block_hash: branch_point_hash,
164        })
165    }
166
167    const fn reset(&mut self) {
168        self.fork_length = 0;
169        self.branch_point_hash = None;
170        self.fork_parent_hash = None;
171    }
172}
173
174impl Command {
175    /// Execute `benchmark new-payload-fcu` command
176    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
177        if self.reorg.is_some() && self.benchmark.rlp_blocks {
178            bail!("--reorg cannot be combined with --rlp-blocks")
179        }
180        if self.reorg.is_some() && self.enable_bal {
181            bail!("--reorg cannot be combined with --enable-bal")
182        }
183
184        // Log mode configuration
185        if let Some(duration) = self.wait_time {
186            info!(target: "reth-bench", "Using wait-time mode with {}ms minimum interval between blocks", duration.as_millis());
187        }
188        if let Some(depth) = self.reorg {
189            info!(target: "reth-bench", depth, "Using testing_buildBlockV1 reorg mode");
190        }
191
192        let BenchContext {
193            benchmark_mode,
194            block_provider,
195            auth_provider,
196            local_rpc_provider,
197            next_block,
198            use_reth_namespace,
199            rlp_blocks,
200            wait_for_persistence,
201            no_wait_for_caches,
202            big_blocks_initial_state,
203        } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
204
205        let total_blocks = benchmark_mode.total_blocks();
206
207        let mut metrics_scraper = MetricsScraper::maybe_new(self.benchmark.metrics_url.clone());
208
209        if use_reth_namespace {
210            info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints");
211        }
212
213        let buffer_size = self.rpc_block_buffer_size;
214        let provider = block_provider.clone();
215        let bench_mode = benchmark_mode.clone();
216        let mut blocks: Pin<Box<dyn Stream<Item = eyre::Result<Payload>> + Send>> = Box::pin(
217            stream::iter((next_block..)
218                .take_while(move |next_block| {
219                    bench_mode.contains(*next_block)
220                }))
221                .map(move |next_block| {
222                    let block_provider = provider.clone();
223                    async move {
224                        let block_res = block_provider
225                            .get_block_by_number(next_block.into())
226                            .full()
227                            .await
228                            .wrap_err_with(|| {
229                                format!("Failed to fetch block by number {next_block}")
230                            });
231                        let block =
232                            match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
233                                Ok(block) => block,
234                                Err(e) => {
235                                    tracing::error!(target: "reth-bench", "Failed to fetch block {next_block}: {e}");
236                                    return Err(e)
237                                }
238                            };
239
240
241                        let bal = if !rlp_blocks &&
242                            (block.header.block_access_list_hash.is_some() || self.enable_bal)
243                        {
244                            Some(fetch_block_access_list(&block_provider, block.header.number).await?)
245                        } else {
246                            None
247                        };
248
249                        let rlp = if rlp_blocks {
250                            let rlp = match block_provider
251                                .debug_get_raw_block(next_block.into())
252                                .await
253                            {
254                                Ok(rlp) => rlp,
255                                Err(e) => {
256                                    tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}: {e}");
257                                    return Err(e.into())
258                                }
259                            };
260                            Some(rlp)
261                        } else {
262                            None
263                        };
264
265                        let head_block_hash = block.header.hash;
266                        let safe_block_hash = block_provider
267                            .get_block_by_number(block.header.number.saturating_sub(32).into());
268
269                        let finalized_block_hash = block_provider
270                            .get_block_by_number(block.header.number.saturating_sub(64).into());
271
272                        let (safe, finalized) =
273                            tokio::join!(safe_block_hash, finalized_block_hash);
274
275                        let safe_block_hash = match safe {
276                            Ok(Some(block)) => block.header.hash,
277                            Ok(None) | Err(_) => head_block_hash,
278                        };
279
280                        let finalized_block_hash = match finalized {
281                            Ok(Some(block)) => block.header.hash,
282                            Ok(None) | Err(_) => head_block_hash,
283                        };
284
285                        let forkchoice = ForkchoiceState {
286                            head_block_hash,
287                            safe_block_hash,
288                            finalized_block_hash,
289                        };
290
291                        Ok(Payload::Block(block, rlp, bal, forkchoice))
292                    }
293                })
294                .buffered(buffer_size),
295        );
296
297        // Big blocks: convert the stream of regular blocks into a stream of big blocks.
298        if let Some(num_big_blocks) = self.benchmark.big_blocks {
299            let block_stream = blocks.map(|res| {
300                res.map(|payload| {
301                    let Payload::Block(block, _, bal, _) = payload else {
302                        unreachable!();
303                    };
304                    Some((block, bal))
305                })
306            });
307            let mut big_blocks = Box::pin(big_blocks_stream(
308                num_big_blocks as u64,
309                self.big_blocks_target_gas,
310                block_stream,
311                big_blocks_initial_state,
312            ));
313            let (tx, rx) = mpsc::channel(buffer_size);
314            tokio::task::spawn_blocking(|| {
315                Handle::current().block_on(async move {
316                    while let Some(big_block) = big_blocks.next().await {
317                        tx.send(big_block.map(Payload::BigBlock)).await.unwrap();
318                    }
319                });
320            });
321
322            blocks = Box::pin(ReceiverStream::new(rx));
323        }
324
325        let mut results = Vec::new();
326        let mut blocks_processed = 0u64;
327        let total_benchmark_duration = Instant::now();
328        let mut total_wait_time = Duration::ZERO;
329        let mut reorg_state = self.reorg.map(ReorgState::new);
330        let mut queued_fork_block = None;
331        while let Some(payload) = {
332            let wait_start = Instant::now();
333            let result = blocks.try_next().await?;
334            total_wait_time += wait_start.elapsed();
335            result
336        } {
337            let gas_used = payload.gas_used();
338            let gas_limit = payload.gas_limit();
339            let block_number = payload.block_number();
340            let canonical_parent_hash = payload.parent_hash();
341            let transaction_count = payload.transaction_count() as u64;
342            let deferred_branch_start_block = reorg_state
343                .as_ref()
344                .filter(|state| state.fork_length == 0 && queued_fork_block.is_none())
345                .map(|_| payload.block().cloned());
346
347            debug!(target: "reth-bench", ?block_number, "Sending payload");
348            let start = Instant::now();
349            let canonical_forkchoice_state = payload.forkchoice_state();
350            let (version, params) = payload.into_new_payload_params(
351                use_reth_namespace,
352                wait_for_persistence,
353                no_wait_for_caches,
354            )?;
355            let server_timings =
356                call_new_payload_with_reth(&auth_provider, version, params).await?;
357            let np_latency =
358                server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
359            let new_payload_result = NewPayloadResult {
360                gas_used,
361                latency: np_latency,
362                persistence_wait: server_timings
363                    .as_ref()
364                    .map(|t| t.persistence_wait)
365                    .unwrap_or_default(),
366                execution_cache_wait: server_timings
367                    .as_ref()
368                    .map(|t| t.execution_cache_wait)
369                    .unwrap_or_default(),
370                sparse_trie_wait: server_timings
371                    .as_ref()
372                    .map(|t| t.sparse_trie_wait)
373                    .unwrap_or_default(),
374            };
375
376            let fcu_start = Instant::now();
377            call_forkchoice_updated_with_reth(&auth_provider, version, canonical_forkchoice_state)
378                .await?;
379            let fcu_latency = fcu_start.elapsed();
380
381            let total_latency =
382                if server_timings.is_some() { np_latency + fcu_latency } else { start.elapsed() };
383            let combined_result = CombinedResult {
384                block_number,
385                gas_limit,
386                transaction_count,
387                new_payload_result,
388                fcu_latency,
389                total_latency,
390            };
391
392            if let Some(reorg_state) = reorg_state.as_mut() {
393                if queued_fork_block.is_none() && reorg_state.fork_length == 0 {
394                    // A branch start uses a canonical parent, so it can be built lazily here
395                    // instead of being queued ahead of time.
396                    let block = deferred_branch_start_block
397                        .as_ref()
398                        .ok_or_eyre("missing deferred fork block for reorg branch start")?;
399                    queued_fork_block = Some(QueuedForkBlock {
400                        block_number,
401                        prepared: prepare_built_block(
402                            &local_rpc_provider,
403                            block
404                                .as_ref()
405                                .ok_or_eyre("missing deferred fork block for reorg branch start")?,
406                            canonical_parent_hash,
407                            no_wait_for_caches,
408                        )
409                        .await?,
410                    });
411                }
412
413                let queued = queued_fork_block
414                    .take()
415                    .ok_or_eyre("missing queued fork block for reorg replay")?;
416                ensure!(
417                    queued.block_number == block_number,
418                    "queued fork block {} does not match source block {}",
419                    queued.block_number,
420                    block_number
421                );
422                let prepared = queued.prepared;
423
424                call_new_payload_with_reth(&auth_provider, None, prepared.params).await?;
425
426                reorg_state.push_fork_head(canonical_parent_hash, prepared.block_hash);
427                let forkchoice_state = reorg_state.forkchoice_state(prepared.block_hash)?;
428
429                info!(
430                    target: "reth-bench",
431                    block_number,
432                    branch_point = %forkchoice_state.safe_block_hash,
433                    fork_head = %prepared.block_hash,
434                    fork_depth = reorg_state.fork_length,
435                    max_reorg_depth = reorg_state.depth,
436                    "Switching forkchoice to reorg branch"
437                );
438
439                let fcu_start = Instant::now();
440                call_forkchoice_updated_with_reth(&auth_provider, None, forkchoice_state).await?;
441                let _fork_fcu_latency = fcu_start.elapsed();
442
443                let next_fork_block_number = block_number + 1;
444                if reorg_state.fork_length < reorg_state.depth {
445                    queued_fork_block = queue_fork_block(
446                        &block_provider,
447                        &local_rpc_provider,
448                        &benchmark_mode,
449                        next_fork_block_number,
450                        Some(prepared.block_hash),
451                        no_wait_for_caches,
452                    )
453                    .await?;
454                } else {
455                    info!(
456                        target: "reth-bench",
457                        block_number,
458                        reorg_depth = reorg_state.depth,
459                        "Resetting reorg branch after reaching max depth"
460                    );
461
462                    // `testing_buildBlockV1` resolves the parent from canonical state, so switch
463                    // back to the source chain before reseeding the next queued fork block.
464                    call_forkchoice_updated_with_reth(
465                        &auth_provider,
466                        version,
467                        canonical_forkchoice_state,
468                    )
469                    .await?;
470
471                    reorg_state.reset();
472                    queued_fork_block = None;
473                }
474            }
475
476            // Exclude time spent waiting on the block prefetch channel from the benchmark duration.
477            // We want to measure engine throughput, not RPC fetch latency.
478            blocks_processed += 1;
479            let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
480            let progress = match total_blocks {
481                Some(total) => format!("{blocks_processed}/{total}"),
482                None => format!("{blocks_processed}"),
483            };
484            info!(target: "reth-bench", progress, %combined_result);
485
486            if let Some(scraper) = metrics_scraper.as_mut() &&
487                let Err(err) = scraper.scrape_after_block(block_number).await
488            {
489                warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
490            }
491
492            if let Some(wait_time) = self.wait_time {
493                let remaining = wait_time.saturating_sub(start.elapsed());
494                if !remaining.is_zero() {
495                    tokio::time::sleep(remaining).await;
496                }
497            }
498
499            let gas_row =
500                TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
501            results.push((gas_row, combined_result));
502        }
503
504        let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
505            results.into_iter().unzip();
506
507        if let Some(ref path) = self.benchmark.output {
508            write_benchmark_results(path, &gas_output_results, &combined_results)?;
509        }
510
511        if let (Some(path), Some(scraper)) = (&self.benchmark.output, &metrics_scraper) {
512            scraper.write_csv(path)?;
513        }
514
515        let gas_output =
516            TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
517
518        info!(
519            target: "reth-bench",
520            total_gas_used = gas_output.total_gas_used,
521            total_duration = ?gas_output.total_duration,
522            execution_duration = ?gas_output.execution_duration,
523            blocks_processed = gas_output.blocks_processed,
524            wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
525            execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
526            "Benchmark complete"
527        );
528
529        Ok(())
530    }
531}
532
533async fn prepare_built_block(
534    block_provider: &RootProvider<AnyNetwork>,
535    block: &AnyRpcBlock,
536    parent_block_hash: B256,
537    no_wait_for_caches: bool,
538) -> eyre::Result<PreparedBuiltBlock> {
539    const MAX_BUILD_ATTEMPTS: usize = 10;
540    const BUILD_RETRY_INTERVAL: Duration = Duration::from_millis(100);
541
542    let request = build_block_request(block, parent_block_hash)?;
543    let built_payload: ExecutionPayloadEnvelopeV5 = {
544        let mut attempts_remaining = MAX_BUILD_ATTEMPTS;
545
546        loop {
547            match block_provider.client().request("testing_buildBlockV1", [request.clone()]).await {
548                Ok(payload) => break payload,
549                Err(err) if attempts_remaining > 1 && is_retryable_build_block_error(&err) => {
550                    warn!(
551                        target: "reth-bench",
552                        block_number = block.header.number,
553                        %parent_block_hash,
554                        attempts_remaining,
555                        error = %err,
556                        "Retrying testing_buildBlockV1 after transient fork build failure"
557                    );
558                    attempts_remaining -= 1;
559                    tokio::time::sleep(BUILD_RETRY_INTERVAL).await;
560                }
561                Err(err) => {
562                    return Err(err).wrap_err_with(|| {
563                        format!(
564                            "Failed to build block {} via testing_buildBlockV1",
565                            block.header.number
566                        )
567                    })
568                }
569            }
570        }
571    };
572
573    let payload = &built_payload.execution_payload.payload_inner.payload_inner;
574    let block_hash = payload.block_hash;
575    let (payload, sidecar) = built_payload
576        .into_payload_and_sidecar(block.header.parent_beacon_block_root.unwrap_or_default());
577    // Fork payloads are built immediately before the next `testing_buildBlockV1` call. Leaving
578    // reth's default persistence wait enabled here gives the regular RPC side a consistent base
579    // state for the next synthetic fork block build.
580    let params = serde_json::to_value((
581        RethNewPayloadInput::ExecutionData(ExecutionData { payload, sidecar }),
582        None::<bool>,
583        no_wait_for_caches.then_some(false),
584    ))?;
585
586    Ok(PreparedBuiltBlock { block_hash, params })
587}
588
589#[allow(clippy::too_many_arguments)]
590async fn queue_fork_block(
591    block_provider: &RootProvider<AnyNetwork>,
592    local_rpc_provider: &RootProvider<AnyNetwork>,
593    benchmark_mode: &crate::bench_mode::BenchMode,
594    block_number: u64,
595    parent_block_hash: Option<B256>,
596    no_wait_for_caches: bool,
597) -> eyre::Result<Option<QueuedForkBlock>> {
598    if !benchmark_mode.contains(block_number) {
599        return Ok(None)
600    }
601
602    let future_block = block_provider
603        .get_block_by_number(alloy_eips::BlockNumberOrTag::Number(block_number))
604        .full()
605        .await
606        .wrap_err_with(|| format!("Failed to fetch block by number {block_number}"))?
607        .ok_or_eyre("Block not found")?;
608    let parent_block_hash = parent_block_hash.unwrap_or(future_block.header.parent_hash);
609
610    Ok(Some(QueuedForkBlock {
611        block_number,
612        prepared: prepare_built_block(
613            local_rpc_provider,
614            &future_block,
615            parent_block_hash,
616            no_wait_for_caches,
617        )
618        .await?,
619    }))
620}
621
622fn is_retryable_build_block_error(err: &alloy_transport::TransportError) -> bool {
623    let message = err.to_string();
624    message.contains("block not found: hash") ||
625        message.contains("block hash not found for block number")
626}
627
628fn build_block_request(
629    block: &AnyRpcBlock,
630    parent_block_hash: B256,
631) -> eyre::Result<TestingBuildBlockRequestV1> {
632    let mut transactions = block
633        .clone()
634        .try_into_transactions()
635        .map_err(|_| eyre::eyre!("Block transactions must be fetched in full for --reorg"))?
636        .into_iter()
637        .map(|tx| {
638            let tx: TxEnvelope =
639                tx.try_into().map_err(|_| eyre::eyre!("unsupported tx type in RPC block"))?;
640            if tx.is_eip4844() {
641                return Ok(None)
642            }
643            Ok(Some(tx.encoded_2718().into()))
644        })
645        .filter_map(|tx| tx.transpose())
646        .collect::<eyre::Result<Vec<_>>>()?;
647
648    // `testing_buildBlockV1` only takes raw transaction bytes, so we exclude blob transactions
649    // from the synthetic fork blocks rather than trying to reconstruct their sidecars.
650    // Keep only 90% of the remaining transactions so the alternate branch produces a materially
651    // different post-state instead of only differing by header data.
652    let keep = transactions.len().saturating_mul(9) / 10;
653    transactions.truncate(keep);
654
655    let rpc_block = block.clone().into_inner();
656
657    Ok(TestingBuildBlockRequestV1 {
658        parent_block_hash,
659        payload_attributes: PayloadAttributes {
660            timestamp: block.header.timestamp,
661            prev_randao: block.header.mix_hash.unwrap_or_default(),
662            suggested_fee_recipient: block.header.beneficiary,
663            withdrawals: rpc_block.withdrawals.map(|withdrawals| withdrawals.into_inner()),
664            parent_beacon_block_root: block.header.parent_beacon_block_root,
665            slot_number: block.header.slot_number,
666        },
667        transactions,
668        extra_data: Some(block.header.extra_data.clone()),
669    })
670}
671
672fn parse_reorg_depth(value: &str) -> Result<usize, String> {
673    let depth = value
674        .trim()
675        .parse::<usize>()
676        .map_err(|_| format!("invalid reorg depth {value:?}, expected a positive integer"))?;
677
678    if depth == 0 {
679        return Err("reorg depth must be greater than 0".to_string())
680    }
681
682    Ok(depth)
683}
684
685/// Payload types used during benchmarking.
686#[expect(clippy::large_enum_variant)]
687enum Payload {
688    /// A regular block payload.
689    Block(AnyRpcBlock, Option<Bytes>, Option<BlockAccessList>, ForkchoiceState),
690    /// A big block payload.
691    BigBlock(BigBlockData<ExecutionData>),
692}
693
694impl Payload {
695    fn into_new_payload_params(
696        self,
697        use_reth_namespace: bool,
698        wait_for_persistence: WaitForPersistence,
699        no_wait_for_caches: bool,
700    ) -> eyre::Result<(Option<EngineApiMessageVersion>, serde_json::Value)> {
701        match self {
702            Self::Block(block, rlp, bal, _) => block_to_new_payload(
703                block,
704                rlp,
705                use_reth_namespace,
706                wait_for_persistence,
707                no_wait_for_caches,
708                bal,
709            ),
710            Self::BigBlock(big_block) => {
711                let wait_for_persistence = wait_for_persistence.rpc_value(big_block.block_number());
712                Ok((
713                    None,
714                    serde_json::to_value((
715                        RethNewPayloadInput::ExecutionData(big_block),
716                        wait_for_persistence,
717                        no_wait_for_caches.then_some(false),
718                    ))?,
719                ))
720            }
721        }
722    }
723
724    fn gas_used(&self) -> u64 {
725        match self {
726            Self::Block(block, _, _, _) => block.header.gas_used,
727            Self::BigBlock(big_block) => big_block.gas_used(),
728        }
729    }
730
731    fn gas_limit(&self) -> u64 {
732        match self {
733            Self::Block(block, _, _, _) => block.header.gas_limit,
734            Self::BigBlock(big_block) => big_block.gas_limit(),
735        }
736    }
737
738    fn block_number(&self) -> u64 {
739        match self {
740            Self::Block(block, _, _, _) => block.header.number,
741            Self::BigBlock(big_block) => big_block.block_number(),
742        }
743    }
744
745    fn parent_hash(&self) -> B256 {
746        match self {
747            Self::Block(block, _, _, _) => block.header.parent_hash,
748            Self::BigBlock(big_block) => big_block.parent_hash(),
749        }
750    }
751
752    fn transaction_count(&self) -> usize {
753        match self {
754            Self::Block(block, _, _, _) => block.transactions.len(),
755            Self::BigBlock(big_block) => big_block.transaction_count(),
756        }
757    }
758
759    const fn block(&self) -> Option<&AnyRpcBlock> {
760        match self {
761            Self::Block(block, _, _, _) => Some(block),
762            Self::BigBlock(_) => None,
763        }
764    }
765
766    fn forkchoice_state(&self) -> ForkchoiceState {
767        match self {
768            Self::Block(_, _, _, forkchoice) => *forkchoice,
769            Self::BigBlock(big_block) => ForkchoiceState {
770                head_block_hash: big_block.block_hash(),
771                safe_block_hash: big_block.block_hash(),
772                finalized_block_hash: big_block.block_hash(),
773            },
774        }
775    }
776}