reth_bench/bench/
generate_big_block.rs

1//! Command for generating large blocks by packing transactions from real blocks.
2//!
3//! This command fetches transactions from existing blocks and packs them into a single
4//! large block using the `testing_buildBlockV1` RPC endpoint.
5
6use crate::{
7    authenticated_transport::AuthenticatedTransportConnect, bench::helpers::parse_gas_limit,
8};
9use alloy_eips::{BlockNumberOrTag, Typed2718};
10use alloy_primitives::{Bytes, B256};
11use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
12use alloy_rpc_client::ClientBuilder;
13use alloy_rpc_types_engine::{
14    ExecutionPayloadEnvelopeV4, ExecutionPayloadEnvelopeV5, ForkchoiceState, JwtSecret,
15    PayloadAttributes,
16};
17use alloy_transport::layers::RetryBackoffLayer;
18use clap::Parser;
19use eyre::Context;
20use reqwest::Url;
21use reth_cli_runner::CliContext;
22use reth_rpc_api::TestingBuildBlockRequestV1;
23use std::future::Future;
24use tokio::sync::mpsc;
25use tracing::{info, warn};
26
27/// A single transaction with its gas used and raw encoded bytes.
28#[derive(Debug, Clone)]
29pub struct RawTransaction {
30    /// The actual gas used by the transaction (from receipt).
31    pub gas_used: u64,
32    /// The transaction type (e.g., 3 for EIP-4844 blob txs).
33    pub tx_type: u8,
34    /// The raw RLP-encoded transaction bytes.
35    pub raw: Bytes,
36}
37
38/// Abstraction over sources of transactions for big block generation.
39///
40/// Implementors provide transactions from different sources (RPC, database, files, etc.)
41pub trait TransactionSource {
42    /// Fetch transactions from a specific block number.
43    ///
44    /// Returns `Ok(None)` if the block doesn't exist.
45    /// Returns `Ok(Some((transactions, gas_used)))` with the block's transactions and total gas.
46    fn fetch_block_transactions(
47        &self,
48        block_number: u64,
49    ) -> impl Future<Output = eyre::Result<Option<(Vec<RawTransaction>, u64)>>> + Send;
50}
51
52/// RPC-based transaction source that fetches from a remote node.
53#[derive(Debug)]
54pub struct RpcTransactionSource {
55    provider: RootProvider<AnyNetwork>,
56}
57
58impl RpcTransactionSource {
59    /// Create a new RPC transaction source.
60    pub const fn new(provider: RootProvider<AnyNetwork>) -> Self {
61        Self { provider }
62    }
63
64    /// Create from an RPC URL with retry backoff.
65    pub fn from_url(rpc_url: &str) -> eyre::Result<Self> {
66        let client = ClientBuilder::default()
67            .layer(RetryBackoffLayer::new(10, 800, u64::MAX))
68            .http(rpc_url.parse()?);
69        let provider = RootProvider::<AnyNetwork>::new(client);
70        Ok(Self { provider })
71    }
72}
73
74impl TransactionSource for RpcTransactionSource {
75    async fn fetch_block_transactions(
76        &self,
77        block_number: u64,
78    ) -> eyre::Result<Option<(Vec<RawTransaction>, u64)>> {
79        // Fetch block and receipts in parallel
80        let (block, receipts) = tokio::try_join!(
81            self.provider.get_block_by_number(block_number.into()).full(),
82            self.provider.get_block_receipts(block_number.into())
83        )?;
84
85        let Some(block) = block else {
86            return Ok(None);
87        };
88
89        let Some(receipts) = receipts else {
90            return Err(eyre::eyre!("Receipts not found for block {}", block_number));
91        };
92
93        let block_gas_used = block.header.gas_used;
94
95        // Convert cumulative gas from receipts to per-tx gas_used
96        let mut prev_cumulative = 0u64;
97        let transactions: Vec<RawTransaction> = block
98            .transactions
99            .txns()
100            .zip(receipts.iter())
101            .map(|(tx, receipt)| {
102                let cumulative = receipt.inner.inner.inner.receipt.cumulative_gas_used;
103                let gas_used = cumulative - prev_cumulative;
104                prev_cumulative = cumulative;
105
106                let with_encoded = tx.inner.inner.clone().into_encoded();
107                RawTransaction {
108                    gas_used,
109                    tx_type: tx.inner.ty(),
110                    raw: with_encoded.encoded_bytes().clone(),
111                }
112            })
113            .collect();
114
115        Ok(Some((transactions, block_gas_used)))
116    }
117}
118
119/// Collects transactions from a source up to a target gas usage.
120#[derive(Debug)]
121pub struct TransactionCollector<S> {
122    source: S,
123    target_gas: u64,
124}
125
126impl<S: TransactionSource> TransactionCollector<S> {
127    /// Create a new transaction collector.
128    pub const fn new(source: S, target_gas: u64) -> Self {
129        Self { source, target_gas }
130    }
131
132    /// Collect transactions starting from the given block number.
133    ///
134    /// Skips blob transactions (type 3) and collects until target gas is reached.
135    /// Returns the collected raw transaction bytes, total gas used, and the next block number.
136    pub async fn collect(&self, start_block: u64) -> eyre::Result<(Vec<Bytes>, u64, u64)> {
137        let mut transactions: Vec<Bytes> = Vec::new();
138        let mut total_gas: u64 = 0;
139        let mut current_block = start_block;
140
141        while total_gas < self.target_gas {
142            let Some((block_txs, _)) = self.source.fetch_block_transactions(current_block).await?
143            else {
144                warn!(block = current_block, "Block not found, stopping");
145                break;
146            };
147
148            for tx in block_txs {
149                // Skip blob transactions (EIP-4844, type 3)
150                if tx.tx_type == 3 {
151                    continue;
152                }
153
154                if total_gas + tx.gas_used <= self.target_gas {
155                    transactions.push(tx.raw);
156                    total_gas += tx.gas_used;
157                }
158
159                if total_gas >= self.target_gas {
160                    break;
161                }
162            }
163
164            current_block += 1;
165
166            // Stop early if remaining gas is under 1M (close enough to target)
167            let remaining_gas = self.target_gas.saturating_sub(total_gas);
168            if remaining_gas < 1_000_000 {
169                break;
170            }
171        }
172
173        info!(
174            total_txs = transactions.len(),
175            total_gas,
176            next_block = current_block,
177            "Finished collecting transactions"
178        );
179
180        Ok((transactions, total_gas, current_block))
181    }
182}
183
184/// `reth bench generate-big-block` command
185///
186/// Generates a large block by fetching transactions from existing blocks and packing them
187/// into a single block using the `testing_buildBlockV1` RPC endpoint.
188#[derive(Debug, Parser)]
189pub struct Command {
190    /// The RPC URL to use for fetching blocks (can be an external archive node).
191    #[arg(long, value_name = "RPC_URL")]
192    rpc_url: String,
193
194    /// The engine RPC URL (with JWT authentication).
195    #[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
196    engine_rpc_url: String,
197
198    /// The RPC URL for `testing_buildBlockV1` calls (same node as engine, regular RPC port).
199    #[arg(long, value_name = "TESTING_RPC_URL", default_value = "http://localhost:8545")]
200    testing_rpc_url: String,
201
202    /// Path to the JWT secret file for engine API authentication.
203    #[arg(long, value_name = "JWT_SECRET")]
204    jwt_secret: std::path::PathBuf,
205
206    /// Target gas to pack into the block.
207    /// Accepts short notation: K for thousand, M for million, G for billion (e.g., 1G = 1
208    /// billion).
209    #[arg(long, value_name = "TARGET_GAS", default_value = "30000000", value_parser = parse_gas_limit)]
210    target_gas: u64,
211
212    /// Block number to start fetching transactions from (required).
213    ///
214    /// This must be the last canonical block BEFORE any gas limit ramping was performed.
215    /// The command collects transactions from historical blocks starting at this number
216    /// to pack into large blocks.
217    ///
218    /// How to determine this value:
219    /// - If starting from a fresh node (no gas limit ramp yet): use the current chain tip
220    /// - If gas limit ramping has already been performed: use the block number that was the chain
221    ///   tip BEFORE ramping began (you must track this yourself)
222    ///
223    /// Using a block after ramping started will cause transaction collection to fail
224    /// because those blocks contain synthetic transactions that cannot be replayed.
225    #[arg(long, value_name = "FROM_BLOCK")]
226    from_block: u64,
227
228    /// Execute the payload (call newPayload + forkchoiceUpdated).
229    /// If false, only builds the payload and prints it.
230    #[arg(long, default_value = "false")]
231    execute: bool,
232
233    /// Number of payloads to generate. Each payload uses the previous as parent.
234    /// When count == 1, the payload is only generated and saved, not executed.
235    /// When count > 1, each payload is executed before building the next.
236    #[arg(long, default_value = "1")]
237    count: u64,
238
239    /// Number of transaction batches to prefetch in background when count > 1.
240    /// Higher values reduce latency but use more memory.
241    #[arg(long, default_value = "4")]
242    prefetch_buffer: usize,
243
244    /// Output directory for generated payloads. Each payload is saved as `payload_block_N.json`.
245    #[arg(long, value_name = "OUTPUT_DIR")]
246    output_dir: std::path::PathBuf,
247}
248
249/// A built payload ready for execution.
250struct BuiltPayload {
251    block_number: u64,
252    envelope: ExecutionPayloadEnvelopeV4,
253    block_hash: B256,
254    timestamp: u64,
255}
256
257impl Command {
258    /// Execute the `generate-big-block` command
259    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
260        info!(target_gas = self.target_gas, count = self.count, "Generating big block(s)");
261
262        // Set up authenticated engine provider
263        let jwt =
264            std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
265        let jwt = JwtSecret::from_hex(jwt.trim())?;
266        let auth_url = Url::parse(&self.engine_rpc_url)?;
267
268        info!("Connecting to Engine RPC at {}", auth_url);
269        let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
270        let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
271        let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
272
273        // Set up testing RPC provider (for testing_buildBlockV1)
274        info!("Connecting to Testing RPC at {}", self.testing_rpc_url);
275        let testing_client = ClientBuilder::default()
276            .layer(RetryBackoffLayer::new(10, 800, u64::MAX))
277            .http(self.testing_rpc_url.parse()?);
278        let testing_provider = RootProvider::<AnyNetwork>::new(testing_client);
279
280        // Get the parent block (latest canonical block)
281        info!(endpoint = "engine", method = "eth_getBlockByNumber", block = "latest", "RPC call");
282        let parent_block = auth_provider
283            .get_block_by_number(BlockNumberOrTag::Latest)
284            .await?
285            .ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
286
287        let parent_hash = parent_block.header.hash;
288        let parent_number = parent_block.header.number;
289        let parent_timestamp = parent_block.header.timestamp;
290
291        info!(
292            parent_hash = %parent_hash,
293            parent_number = parent_number,
294            "Using initial parent block"
295        );
296
297        // Create output directory
298        std::fs::create_dir_all(&self.output_dir).wrap_err_with(|| {
299            format!("Failed to create output directory: {:?}", self.output_dir)
300        })?;
301
302        let start_block = self.from_block;
303
304        // Use pipelined execution when generating multiple payloads
305        if self.count > 1 {
306            self.execute_pipelined(
307                &auth_provider,
308                &testing_provider,
309                start_block,
310                parent_hash,
311                parent_timestamp,
312            )
313            .await?;
314        } else {
315            // Single payload - collect transactions and build
316            let tx_source = RpcTransactionSource::from_url(&self.rpc_url)?;
317            let collector = TransactionCollector::new(tx_source, self.target_gas);
318            let (transactions, _total_gas, _next_block) = collector.collect(start_block).await?;
319
320            if transactions.is_empty() {
321                return Err(eyre::eyre!("No transactions collected"));
322            }
323
324            self.execute_sequential(
325                &auth_provider,
326                &testing_provider,
327                transactions,
328                parent_hash,
329                parent_timestamp,
330            )
331            .await?;
332        }
333
334        info!(count = self.count, output_dir = %self.output_dir.display(), "All payloads generated");
335        Ok(())
336    }
337
338    /// Sequential execution path for single payload or no-execute mode.
339    async fn execute_sequential(
340        &self,
341        auth_provider: &RootProvider<AnyNetwork>,
342        testing_provider: &RootProvider<AnyNetwork>,
343        transactions: Vec<Bytes>,
344        mut parent_hash: B256,
345        mut parent_timestamp: u64,
346    ) -> eyre::Result<()> {
347        for i in 0..self.count {
348            info!(
349                payload = i + 1,
350                total = self.count,
351                parent_hash = %parent_hash,
352                parent_timestamp = parent_timestamp,
353                "Building payload via testing_buildBlockV1"
354            );
355
356            let built = self
357                .build_payload(testing_provider, &transactions, i, parent_hash, parent_timestamp)
358                .await?;
359
360            self.save_payload(&built)?;
361
362            if self.execute || self.count > 1 {
363                info!(payload = i + 1, block_hash = %built.block_hash, "Executing payload (newPayload + FCU)");
364                self.execute_payload_v4(auth_provider, built.envelope, parent_hash).await?;
365                info!(payload = i + 1, "Payload executed successfully");
366            }
367
368            parent_hash = built.block_hash;
369            parent_timestamp = built.timestamp;
370        }
371        Ok(())
372    }
373
374    /// Pipelined execution - fetches transactions and builds payloads in background.
375    async fn execute_pipelined(
376        &self,
377        auth_provider: &RootProvider<AnyNetwork>,
378        testing_provider: &RootProvider<AnyNetwork>,
379        start_block: u64,
380        initial_parent_hash: B256,
381        initial_parent_timestamp: u64,
382    ) -> eyre::Result<()> {
383        // Create channel for transaction batches (one batch per payload)
384        let (tx_sender, mut tx_receiver) = mpsc::channel::<Vec<Bytes>>(self.prefetch_buffer);
385
386        // Spawn background task to continuously fetch transaction batches
387        let rpc_url = self.rpc_url.clone();
388        let target_gas = self.target_gas;
389        let count = self.count;
390
391        let fetcher_handle = tokio::spawn(async move {
392            let tx_source = match RpcTransactionSource::from_url(&rpc_url) {
393                Ok(source) => source,
394                Err(e) => {
395                    warn!(error = %e, "Failed to create transaction source");
396                    return;
397                }
398            };
399
400            let collector = TransactionCollector::new(tx_source, target_gas);
401            let mut current_block = start_block;
402
403            for payload_idx in 0..count {
404                match collector.collect(current_block).await {
405                    Ok((transactions, total_gas, next_block)) => {
406                        info!(
407                            payload = payload_idx + 1,
408                            tx_count = transactions.len(),
409                            total_gas,
410                            blocks = format!("{}..{}", current_block, next_block),
411                            "Fetched transactions"
412                        );
413                        current_block = next_block;
414
415                        if tx_sender.send(transactions).await.is_err() {
416                            break;
417                        }
418                    }
419                    Err(e) => {
420                        warn!(payload = payload_idx + 1, error = %e, "Failed to fetch transactions");
421                        break;
422                    }
423                }
424            }
425        });
426
427        let mut parent_hash = initial_parent_hash;
428        let mut parent_timestamp = initial_parent_timestamp;
429        let mut pending_build: Option<tokio::task::JoinHandle<eyre::Result<BuiltPayload>>> = None;
430
431        for i in 0..self.count {
432            let is_last = i == self.count - 1;
433
434            // Get current payload (either from pending build or build now)
435            let current_payload = if let Some(handle) = pending_build.take() {
436                handle.await??
437            } else {
438                // First payload - wait for transactions and build synchronously
439                let transactions = tx_receiver
440                    .recv()
441                    .await
442                    .ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
443
444                if transactions.is_empty() {
445                    return Err(eyre::eyre!("No transactions collected for payload {}", i + 1));
446                }
447
448                info!(
449                    payload = i + 1,
450                    total = self.count,
451                    parent_hash = %parent_hash,
452                    parent_timestamp = parent_timestamp,
453                    tx_count = transactions.len(),
454                    "Building payload via testing_buildBlockV1"
455                );
456                self.build_payload(
457                    testing_provider,
458                    &transactions,
459                    i,
460                    parent_hash,
461                    parent_timestamp,
462                )
463                .await?
464            };
465
466            self.save_payload(&current_payload)?;
467
468            let current_block_hash = current_payload.block_hash;
469            let current_timestamp = current_payload.timestamp;
470
471            // Execute current payload first
472            info!(payload = i + 1, block_hash = %current_block_hash, "Executing payload (newPayload + FCU)");
473            self.execute_payload_v4(auth_provider, current_payload.envelope, parent_hash).await?;
474            info!(payload = i + 1, "Payload executed successfully");
475
476            // Start building next payload in background (if not last) - AFTER execution
477            if !is_last {
478                // Get transactions for next payload (should already be fetched or fetching)
479                let next_transactions = tx_receiver
480                    .recv()
481                    .await
482                    .ok_or_else(|| eyre::eyre!("Transaction fetcher stopped unexpectedly"))?;
483
484                if next_transactions.is_empty() {
485                    return Err(eyre::eyre!("No transactions collected for payload {}", i + 2));
486                }
487
488                let testing_provider = testing_provider.clone();
489                let next_index = i + 1;
490                let total = self.count;
491
492                pending_build = Some(tokio::spawn(async move {
493                    info!(
494                        payload = next_index + 1,
495                        total = total,
496                        parent_hash = %current_block_hash,
497                        parent_timestamp = current_timestamp,
498                        tx_count = next_transactions.len(),
499                        "Building payload via testing_buildBlockV1"
500                    );
501
502                    Self::build_payload_static(
503                        &testing_provider,
504                        &next_transactions,
505                        next_index,
506                        current_block_hash,
507                        current_timestamp,
508                    )
509                    .await
510                }));
511            }
512
513            parent_hash = current_block_hash;
514            parent_timestamp = current_timestamp;
515        }
516
517        // Clean up the fetcher task
518        drop(tx_receiver);
519        let _ = fetcher_handle.await;
520
521        Ok(())
522    }
523
524    /// Build a single payload via `testing_buildBlockV1`.
525    async fn build_payload(
526        &self,
527        testing_provider: &RootProvider<AnyNetwork>,
528        transactions: &[Bytes],
529        index: u64,
530        parent_hash: B256,
531        parent_timestamp: u64,
532    ) -> eyre::Result<BuiltPayload> {
533        Self::build_payload_static(
534            testing_provider,
535            transactions,
536            index,
537            parent_hash,
538            parent_timestamp,
539        )
540        .await
541    }
542
543    /// Static version for use in spawned tasks.
544    async fn build_payload_static(
545        testing_provider: &RootProvider<AnyNetwork>,
546        transactions: &[Bytes],
547        index: u64,
548        parent_hash: B256,
549        parent_timestamp: u64,
550    ) -> eyre::Result<BuiltPayload> {
551        let request = TestingBuildBlockRequestV1 {
552            parent_block_hash: parent_hash,
553            payload_attributes: PayloadAttributes {
554                timestamp: parent_timestamp + 12,
555                prev_randao: B256::ZERO,
556                suggested_fee_recipient: alloy_primitives::Address::ZERO,
557                withdrawals: Some(vec![]),
558                parent_beacon_block_root: Some(B256::ZERO),
559            },
560            transactions: transactions.to_vec(),
561            extra_data: None,
562        };
563
564        let total_tx_bytes: usize = transactions.iter().map(|tx| tx.len()).sum();
565        info!(
566            payload = index + 1,
567            tx_count = transactions.len(),
568            total_tx_bytes = total_tx_bytes,
569            parent_hash = %parent_hash,
570            "Sending to testing_buildBlockV1"
571        );
572        let envelope: ExecutionPayloadEnvelopeV5 =
573            testing_provider.client().request("testing_buildBlockV1", [request]).await?;
574
575        let v4_envelope = envelope.try_into_v4()?;
576
577        let inner = &v4_envelope.envelope_inner.execution_payload.payload_inner.payload_inner;
578        let block_hash = inner.block_hash;
579        let block_number = inner.block_number;
580        let timestamp = inner.timestamp;
581
582        Ok(BuiltPayload { block_number, envelope: v4_envelope, block_hash, timestamp })
583    }
584
585    /// Save a payload to disk.
586    fn save_payload(&self, payload: &BuiltPayload) -> eyre::Result<()> {
587        let filename = format!("payload_block_{}.json", payload.block_number);
588        let filepath = self.output_dir.join(&filename);
589        let json = serde_json::to_string_pretty(&payload.envelope)?;
590        std::fs::write(&filepath, &json)
591            .wrap_err_with(|| format!("Failed to write payload to {:?}", filepath))?;
592        info!(block_number = payload.block_number, block_hash = %payload.block_hash, path = %filepath.display(), "Payload saved");
593        Ok(())
594    }
595
596    async fn execute_payload_v4(
597        &self,
598        provider: &RootProvider<AnyNetwork>,
599        envelope: ExecutionPayloadEnvelopeV4,
600        parent_hash: B256,
601    ) -> eyre::Result<()> {
602        let block_hash =
603            envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
604
605        let status = provider
606            .new_payload_v4(
607                envelope.envelope_inner.execution_payload,
608                vec![],
609                B256::ZERO,
610                envelope.execution_requests.to_vec(),
611            )
612            .await?;
613
614        if !status.is_valid() {
615            return Err(eyre::eyre!("Payload rejected: {:?}", status));
616        }
617
618        let fcu_state = ForkchoiceState {
619            head_block_hash: block_hash,
620            safe_block_hash: parent_hash,
621            finalized_block_hash: parent_hash,
622        };
623
624        let fcu_result = provider.fork_choice_updated_v3(fcu_state, None).await?;
625
626        if !fcu_result.is_valid() {
627            return Err(eyre::eyre!("FCU rejected: {:?}", fcu_result));
628        }
629
630        Ok(())
631    }
632}