reth/commands/debug_cmd/
execution.rs

1//! Command for debugging execution.
2
3use crate::{args::NetworkArgs, utils::get_single_header};
4use alloy_eips::BlockHashOrNumber;
5use alloy_primitives::{BlockNumber, B256};
6use clap::Parser;
7use futures::StreamExt;
8use reth_chainspec::ChainSpec;
9use reth_cli::chainspec::ChainSpecParser;
10use reth_cli_commands::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
11use reth_cli_runner::CliContext;
12use reth_cli_util::get_secret_key;
13use reth_config::Config;
14use reth_consensus::FullConsensus;
15use reth_db::DatabaseEnv;
16use reth_downloaders::{
17    bodies::bodies::BodiesDownloaderBuilder,
18    headers::reverse_headers::ReverseHeadersDownloaderBuilder,
19};
20use reth_errors::ConsensusError;
21use reth_ethereum_primitives::EthPrimitives;
22use reth_exex::ExExManagerHandle;
23use reth_network::{BlockDownloaderProvider, NetworkHandle};
24use reth_network_api::NetworkInfo;
25use reth_network_p2p::{headers::client::HeadersClient, EthBlockClient};
26use reth_node_api::NodeTypesWithDBAdapter;
27use reth_node_ethereum::{consensus::EthBeaconConsensus, EthExecutorProvider};
28use reth_node_events::node::NodeEvent;
29use reth_provider::{
30    providers::ProviderNodeTypes, ChainSpecProvider, ProviderFactory, StageCheckpointReader,
31};
32use reth_prune::PruneModes;
33use reth_stages::{
34    sets::DefaultStages, stages::ExecutionStage, ExecutionStageThresholds, Pipeline, StageId,
35    StageSet,
36};
37use reth_static_file::StaticFileProducer;
38use reth_tasks::TaskExecutor;
39use std::{path::PathBuf, sync::Arc};
40use tokio::sync::watch;
41use tracing::*;
42
43/// `reth debug execution` command
44#[derive(Debug, Parser)]
45pub struct Command<C: ChainSpecParser> {
46    #[command(flatten)]
47    env: EnvironmentArgs<C>,
48
49    #[command(flatten)]
50    network: NetworkArgs,
51
52    /// The maximum block height.
53    #[arg(long)]
54    pub to: u64,
55
56    /// The block interval for sync and unwind.
57    /// Defaults to `1000`.
58    #[arg(long, default_value = "1000")]
59    pub interval: u64,
60}
61
62impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
63    fn build_pipeline<N, Client>(
64        &self,
65        config: &Config,
66        client: Client,
67        consensus: Arc<dyn FullConsensus<N::Primitives, Error = ConsensusError>>,
68        provider_factory: ProviderFactory<N>,
69        task_executor: &TaskExecutor,
70        static_file_producer: StaticFileProducer<ProviderFactory<N>>,
71    ) -> eyre::Result<Pipeline<N>>
72    where
73        N: ProviderNodeTypes<ChainSpec = C::ChainSpec, Primitives = EthPrimitives> + CliNodeTypes,
74        Client: EthBlockClient + 'static,
75    {
76        // building network downloaders using the fetch client
77        let header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers)
78            .build(client.clone(), consensus.clone().as_header_validator())
79            .into_task_with(task_executor);
80
81        let body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies)
82            .build(client, consensus.clone().as_consensus(), provider_factory.clone())
83            .into_task_with(task_executor);
84
85        let stage_conf = &config.stages;
86        let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();
87
88        let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
89        let executor = EthExecutorProvider::ethereum(provider_factory.chain_spec());
90
91        let pipeline = Pipeline::<N>::builder()
92            .with_tip_sender(tip_tx)
93            .add_stages(
94                DefaultStages::new(
95                    provider_factory.clone(),
96                    tip_rx,
97                    consensus.clone(),
98                    header_downloader,
99                    body_downloader,
100                    executor.clone(),
101                    stage_conf.clone(),
102                    prune_modes,
103                )
104                .set(ExecutionStage::new(
105                    executor,
106                    consensus.clone(),
107                    ExecutionStageThresholds {
108                        max_blocks: None,
109                        max_changes: None,
110                        max_cumulative_gas: None,
111                        max_duration: None,
112                    },
113                    stage_conf.execution_external_clean_threshold(),
114                    ExExManagerHandle::empty(),
115                )),
116            )
117            .build(provider_factory, static_file_producer);
118
119        Ok(pipeline)
120    }
121
122    async fn build_network<
123        N: CliNodeTypes<ChainSpec = C::ChainSpec, Primitives = EthPrimitives>,
124    >(
125        &self,
126        config: &Config,
127        task_executor: TaskExecutor,
128        provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
129        network_secret_path: PathBuf,
130        default_peers_path: PathBuf,
131    ) -> eyre::Result<NetworkHandle> {
132        let secret_key = get_secret_key(&network_secret_path)?;
133        let network = self
134            .network
135            .network_config(config, provider_factory.chain_spec(), secret_key, default_peers_path)
136            .with_task_executor(Box::new(task_executor))
137            .build(provider_factory)
138            .start_network()
139            .await?;
140        info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
141        debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID");
142        Ok(network)
143    }
144
145    async fn fetch_block_hash<Client>(
146        &self,
147        client: Client,
148        block: BlockNumber,
149    ) -> eyre::Result<B256>
150    where
151        Client: HeadersClient<Header: reth_primitives_traits::BlockHeader>,
152    {
153        info!(target: "reth::cli", ?block, "Fetching block from the network.");
154        loop {
155            match get_single_header(&client, BlockHashOrNumber::Number(block)).await {
156                Ok(tip_header) => {
157                    info!(target: "reth::cli", ?block, "Successfully fetched block");
158                    return Ok(tip_header.hash())
159                }
160                Err(error) => {
161                    error!(target: "reth::cli", ?block, %error, "Failed to fetch the block. Retrying...");
162                }
163            }
164        }
165    }
166
167    /// Execute `execution-debug` command
168    pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec, Primitives = EthPrimitives>>(
169        self,
170        ctx: CliContext,
171    ) -> eyre::Result<()> {
172        let Environment { provider_factory, config, data_dir } =
173            self.env.init::<N>(AccessRights::RW)?;
174
175        let consensus: Arc<dyn FullConsensus<N::Primitives, Error = ConsensusError>> =
176            Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec()));
177
178        // Configure and build network
179        let network_secret_path =
180            self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret());
181        let network = self
182            .build_network(
183                &config,
184                ctx.task_executor.clone(),
185                provider_factory.clone(),
186                network_secret_path,
187                data_dir.known_peers(),
188            )
189            .await?;
190
191        let static_file_producer =
192            StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
193
194        // Configure the pipeline
195        let fetch_client = network.fetch_client().await?;
196        let mut pipeline = self.build_pipeline(
197            &config,
198            fetch_client.clone(),
199            consensus.clone(),
200            provider_factory.clone(),
201            &ctx.task_executor,
202            static_file_producer,
203        )?;
204
205        let provider = provider_factory.provider()?;
206
207        let latest_block_number =
208            provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number);
209        if latest_block_number.unwrap_or_default() >= self.to {
210            info!(target: "reth::cli", latest = latest_block_number, "Nothing to run");
211            return Ok(())
212        }
213
214        ctx.task_executor.spawn_critical(
215            "events task",
216            reth_node_events::node::handle_events(
217                Some(Box::new(network)),
218                latest_block_number,
219                pipeline.events().map(Into::<NodeEvent<N::Primitives>>::into),
220            ),
221        );
222
223        let mut current_max_block = latest_block_number.unwrap_or_default();
224        while current_max_block < self.to {
225            let next_block = current_max_block + 1;
226            let target_block = self.to.min(current_max_block + self.interval);
227            let target_block_hash =
228                self.fetch_block_hash(fetch_client.clone(), target_block).await?;
229
230            // Run the pipeline
231            info!(target: "reth::cli", from = next_block, to = target_block, tip = ?target_block_hash, "Starting pipeline");
232            pipeline.set_tip(target_block_hash);
233            let result = pipeline.run_loop().await?;
234            trace!(target: "reth::cli", from = next_block, to = target_block, tip = ?target_block_hash, ?result, "Pipeline finished");
235
236            // Unwind the pipeline without committing.
237            provider_factory.provider_rw()?.unwind_trie_state_range(next_block..=target_block)?;
238
239            // Update latest block
240            current_max_block = target_block;
241        }
242
243        Ok(())
244    }
245}