reth_ethereum_cli/
app.rs

1use crate::{interface::Commands, Cli};
2use eyre::{eyre, Result};
3use reth_chainspec::{ChainSpec, EthChainSpec, Hardforks};
4use reth_cli::chainspec::ChainSpecParser;
5use reth_cli_commands::{
6    common::{CliComponentsBuilder, CliHeader, CliNodeTypes},
7    launcher::{FnLauncher, Launcher},
8};
9use reth_cli_runner::CliRunner;
10use reth_db::DatabaseEnv;
11use reth_node_api::NodePrimitives;
12use reth_node_builder::{NodeBuilder, WithLaunchContext};
13use reth_node_ethereum::{consensus::EthBeaconConsensus, EthEvmConfig, EthereumNode};
14use reth_node_metrics::recorder::install_prometheus_recorder;
15use reth_rpc_server_types::RpcModuleValidator;
16use reth_tracing::{FileWorkerGuard, Layers};
17use reth_tracing_otlp::OtlpProtocol;
18use std::{fmt, sync::Arc};
19use tracing::info;
20use url::Url;
21
22/// A wrapper around a parsed CLI that handles command execution.
23#[derive(Debug)]
24pub struct CliApp<Spec: ChainSpecParser, Ext: clap::Args + fmt::Debug, Rpc: RpcModuleValidator> {
25    cli: Cli<Spec, Ext, Rpc>,
26    runner: Option<CliRunner>,
27    layers: Option<Layers>,
28    guard: Option<FileWorkerGuard>,
29}
30
31impl<C, Ext, Rpc> CliApp<C, Ext, Rpc>
32where
33    C: ChainSpecParser,
34    Ext: clap::Args + fmt::Debug,
35    Rpc: RpcModuleValidator,
36{
37    pub(crate) fn new(cli: Cli<C, Ext, Rpc>) -> Self {
38        Self { cli, runner: None, layers: Some(Layers::new()), guard: None }
39    }
40
41    /// Sets the runner for the CLI commander.
42    ///
43    /// This replaces any existing runner with the provided one.
44    pub fn set_runner(&mut self, runner: CliRunner) {
45        self.runner = Some(runner);
46    }
47
48    /// Access to tracing layers.
49    ///
50    /// Returns a mutable reference to the tracing layers, or error
51    /// if tracing initialized and layers have detached already.
52    pub fn access_tracing_layers(&mut self) -> Result<&mut Layers> {
53        self.layers.as_mut().ok_or_else(|| eyre!("Tracing already initialized"))
54    }
55
56    /// Execute the configured cli command.
57    ///
58    /// This accepts a closure that is used to launch the node via the
59    /// [`NodeCommand`](reth_cli_commands::node::NodeCommand).
60    pub fn run(self, launcher: impl Launcher<C, Ext>) -> Result<()>
61    where
62        C: ChainSpecParser<ChainSpec = ChainSpec>,
63    {
64        let components = |spec: Arc<ChainSpec>| {
65            (EthEvmConfig::ethereum(spec.clone()), Arc::new(EthBeaconConsensus::new(spec)))
66        };
67
68        self.run_with_components::<EthereumNode>(components, |builder, ext| async move {
69            launcher.entrypoint(builder, ext).await
70        })
71    }
72
73    /// Execute the configured cli command with the provided [`CliComponentsBuilder`].
74    ///
75    /// This accepts a closure that is used to launch the node via the
76    /// [`NodeCommand`](reth_cli_commands::node::NodeCommand) and allows providing custom
77    /// components.
78    pub fn run_with_components<N>(
79        mut self,
80        components: impl CliComponentsBuilder<N>,
81        launcher: impl AsyncFnOnce(
82            WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>,
83            Ext,
84        ) -> Result<()>,
85    ) -> Result<()>
86    where
87        N: CliNodeTypes<Primitives: NodePrimitives<BlockHeader: CliHeader>, ChainSpec: Hardforks>,
88        C: ChainSpecParser<ChainSpec = N::ChainSpec>,
89    {
90        let runner = match self.runner.take() {
91            Some(runner) => runner,
92            None => CliRunner::try_default_runtime()?,
93        };
94
95        // Add network name if available to the logs dir
96        if let Some(chain_spec) = self.cli.command.chain_spec() {
97            self.cli.logs.log_file_directory =
98                self.cli.logs.log_file_directory.join(chain_spec.chain().to_string());
99        }
100
101        self.init_tracing(&runner)?;
102
103        // Install the prometheus recorder to be sure to record all metrics
104        let _ = install_prometheus_recorder();
105
106        run_commands_with::<C, Ext, Rpc, N>(self.cli, runner, components, launcher)
107    }
108
109    /// Initializes tracing with the configured options.
110    ///
111    /// If file logging is enabled, this function stores guard to the struct.
112    /// For gRPC OTLP, it requires tokio runtime context.
113    pub fn init_tracing(&mut self, runner: &CliRunner) -> Result<()> {
114        if self.guard.is_none() {
115            let mut layers = self.layers.take().unwrap_or_default();
116
117            #[cfg(feature = "otlp")]
118            {
119                self.cli.traces.validate()?;
120
121                if let Some(endpoint) = &self.cli.traces.otlp {
122                    info!(target: "reth::cli", "Starting OTLP tracing export to {:?}", endpoint);
123                    self.init_otlp_export(&mut layers, endpoint, runner)?;
124                }
125            }
126
127            self.guard = self.cli.logs.init_tracing_with_layers(layers)?;
128            info!(target: "reth::cli", "Initialized tracing, debug log directory: {}", self.cli.logs.log_file_directory);
129        }
130        Ok(())
131    }
132
133    /// Initialize OTLP tracing export based on protocol type.
134    ///
135    /// For gRPC, `block_on` is required because tonic's channel initialization needs
136    /// a tokio runtime context, even though `with_span_layer` itself is not async.
137    #[cfg(feature = "otlp")]
138    fn init_otlp_export(
139        &self,
140        layers: &mut Layers,
141        endpoint: &Url,
142        runner: &CliRunner,
143    ) -> Result<()> {
144        let endpoint = endpoint.clone();
145        let protocol = self.cli.traces.protocol;
146        let filter_level = self.cli.traces.otlp_filter.clone();
147
148        match protocol {
149            OtlpProtocol::Grpc => {
150                runner.block_on(async {
151                    layers.with_span_layer("reth".to_string(), endpoint, filter_level, protocol)
152                })?;
153            }
154            OtlpProtocol::Http => {
155                layers.with_span_layer("reth".to_string(), endpoint, filter_level, protocol)?;
156            }
157        }
158
159        Ok(())
160    }
161}
162
163/// Run CLI commands with the provided runner, components and launcher.
164/// This is the shared implementation used by both `CliApp` and Cli methods.
165pub(crate) fn run_commands_with<C, Ext, Rpc, N>(
166    cli: Cli<C, Ext, Rpc>,
167    runner: CliRunner,
168    components: impl CliComponentsBuilder<N>,
169    launcher: impl AsyncFnOnce(
170        WithLaunchContext<NodeBuilder<Arc<DatabaseEnv>, C::ChainSpec>>,
171        Ext,
172    ) -> Result<()>,
173) -> Result<()>
174where
175    C: ChainSpecParser<ChainSpec = N::ChainSpec>,
176    Ext: clap::Args + fmt::Debug,
177    Rpc: RpcModuleValidator,
178    N: CliNodeTypes<Primitives: NodePrimitives<BlockHeader: CliHeader>, ChainSpec: Hardforks>,
179{
180    match cli.command {
181        Commands::Node(command) => {
182            // Validate RPC modules using the configured validator
183            if let Some(http_api) = &command.rpc.http_api {
184                Rpc::validate_selection(http_api, "http.api").map_err(|e| eyre!("{e}"))?;
185            }
186            if let Some(ws_api) = &command.rpc.ws_api {
187                Rpc::validate_selection(ws_api, "ws.api").map_err(|e| eyre!("{e}"))?;
188            }
189
190            runner.run_command_until_exit(|ctx| {
191                command.execute(ctx, FnLauncher::new::<C, Ext>(launcher))
192            })
193        }
194        Commands::Init(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>()),
195        Commands::InitState(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>()),
196        Commands::Import(command) => {
197            runner.run_blocking_until_ctrl_c(command.execute::<N, _>(components))
198        }
199        Commands::ImportEra(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>()),
200        Commands::ExportEra(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>()),
201        Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()),
202        Commands::Db(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>()),
203        Commands::Download(command) => runner.run_blocking_until_ctrl_c(command.execute::<N>()),
204        Commands::Stage(command) => {
205            runner.run_command_until_exit(|ctx| command.execute::<N, _>(ctx, components))
206        }
207        Commands::P2P(command) => runner.run_until_ctrl_c(command.execute::<N>()),
208        Commands::Config(command) => runner.run_until_ctrl_c(command.execute()),
209        Commands::Prune(command) => runner.run_until_ctrl_c(command.execute::<N>()),
210        #[cfg(feature = "dev")]
211        Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()),
212        Commands::ReExecute(command) => runner.run_until_ctrl_c(command.execute::<N>(components)),
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use crate::chainspec::EthereumChainSpecParser;
220    use clap::Parser;
221    use reth_cli_commands::node::NoArgs;
222
223    #[test]
224    fn test_cli_app_creation() {
225        let args = vec!["reth", "config"];
226        let cli = Cli::<EthereumChainSpecParser, NoArgs>::try_parse_from(args).unwrap();
227        let app = cli.configure();
228
229        // Verify app is created correctly
230        assert!(app.runner.is_none());
231        assert!(app.layers.is_some());
232        assert!(app.guard.is_none());
233    }
234
235    #[test]
236    fn test_set_runner() {
237        let args = vec!["reth", "config"];
238        let cli = Cli::<EthereumChainSpecParser, NoArgs>::try_parse_from(args).unwrap();
239        let mut app = cli.configure();
240
241        // Create and set a runner
242        if let Ok(runner) = CliRunner::try_default_runtime() {
243            app.set_runner(runner);
244            assert!(app.runner.is_some());
245        }
246    }
247
248    #[test]
249    fn test_access_tracing_layers() {
250        let args = vec!["reth", "config"];
251        let cli = Cli::<EthereumChainSpecParser, NoArgs>::try_parse_from(args).unwrap();
252        let mut app = cli.configure();
253
254        // Should be able to access layers before initialization
255        assert!(app.access_tracing_layers().is_ok());
256
257        // After taking layers (simulating initialization), access should error
258        app.layers = None;
259        assert!(app.access_tracing_layers().is_err());
260    }
261}