reth_optimism_cli/
app.rs

1use crate::{Cli, Commands};
2use eyre::{eyre, Result};
3use reth_cli::chainspec::ChainSpecParser;
4use reth_cli_commands::launcher::Launcher;
5use reth_cli_runner::CliRunner;
6use reth_node_metrics::recorder::install_prometheus_recorder;
7use reth_optimism_chainspec::OpChainSpec;
8use reth_optimism_consensus::OpBeaconConsensus;
9use reth_optimism_node::{OpExecutorProvider, OpNode};
10use reth_rpc_server_types::RpcModuleValidator;
11use reth_tracing::{FileWorkerGuard, Layers};
12use reth_tracing_otlp::OtlpProtocol;
13use std::{fmt, sync::Arc};
14use tracing::info;
15use url::Url;
16
17/// A wrapper around a parsed CLI that handles command execution.
18#[derive(Debug)]
19pub struct CliApp<Spec: ChainSpecParser, Ext: clap::Args + fmt::Debug, Rpc: RpcModuleValidator> {
20    cli: Cli<Spec, Ext, Rpc>,
21    runner: Option<CliRunner>,
22    layers: Option<Layers>,
23    guard: Option<FileWorkerGuard>,
24}
25
26impl<C, Ext, Rpc> CliApp<C, Ext, Rpc>
27where
28    C: ChainSpecParser<ChainSpec = OpChainSpec>,
29    Ext: clap::Args + fmt::Debug,
30    Rpc: RpcModuleValidator,
31{
32    pub(crate) fn new(cli: Cli<C, Ext, Rpc>) -> Self {
33        Self { cli, runner: None, layers: Some(Layers::new()), guard: None }
34    }
35
36    /// Sets the runner for the CLI commander.
37    ///
38    /// This replaces any existing runner with the provided one.
39    pub fn set_runner(&mut self, runner: CliRunner) {
40        self.runner = Some(runner);
41    }
42
43    /// Access to tracing layers.
44    ///
45    /// Returns a mutable reference to the tracing layers, or error
46    /// if tracing initialized and layers have detached already.
47    pub fn access_tracing_layers(&mut self) -> Result<&mut Layers> {
48        self.layers.as_mut().ok_or_else(|| eyre!("Tracing already initialized"))
49    }
50
51    /// Execute the configured cli command.
52    ///
53    /// This accepts a closure that is used to launch the node via the
54    /// [`NodeCommand`](reth_cli_commands::node::NodeCommand).
55    pub fn run(mut self, launcher: impl Launcher<C, Ext>) -> Result<()> {
56        let runner = match self.runner.take() {
57            Some(runner) => runner,
58            None => CliRunner::try_default_runtime()?,
59        };
60
61        // add network name to logs dir
62        // Add network name if available to the logs dir
63        if let Some(chain_spec) = self.cli.command.chain_spec() {
64            self.cli.logs.log_file_directory =
65                self.cli.logs.log_file_directory.join(chain_spec.chain.to_string());
66        }
67
68        self.init_tracing(&runner)?;
69
70        // Install the prometheus recorder to be sure to record all metrics
71        let _ = install_prometheus_recorder();
72
73        let components = |spec: Arc<OpChainSpec>| {
74            (OpExecutorProvider::optimism(spec.clone()), Arc::new(OpBeaconConsensus::new(spec)))
75        };
76
77        match self.cli.command {
78            Commands::Node(command) => {
79                // Validate RPC modules using the configured validator
80                if let Some(http_api) = &command.rpc.http_api {
81                    Rpc::validate_selection(http_api, "http.api").map_err(|e| eyre!("{e}"))?;
82                }
83                if let Some(ws_api) = &command.rpc.ws_api {
84                    Rpc::validate_selection(ws_api, "ws.api").map_err(|e| eyre!("{e}"))?;
85                }
86
87                runner.run_command_until_exit(|ctx| command.execute(ctx, launcher))
88            }
89            Commands::Init(command) => {
90                runner.run_blocking_until_ctrl_c(command.execute::<OpNode>())
91            }
92            Commands::InitState(command) => {
93                runner.run_blocking_until_ctrl_c(command.execute::<OpNode>())
94            }
95            Commands::ImportOp(command) => {
96                runner.run_blocking_until_ctrl_c(command.execute::<OpNode>())
97            }
98            Commands::ImportReceiptsOp(command) => {
99                runner.run_blocking_until_ctrl_c(command.execute::<OpNode>())
100            }
101            Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()),
102            Commands::Db(command) => runner.run_blocking_until_ctrl_c(command.execute::<OpNode>()),
103            Commands::Stage(command) => {
104                runner.run_command_until_exit(|ctx| command.execute::<OpNode, _>(ctx, components))
105            }
106            Commands::P2P(command) => runner.run_until_ctrl_c(command.execute::<OpNode>()),
107            Commands::Config(command) => runner.run_until_ctrl_c(command.execute()),
108            Commands::Prune(command) => runner.run_until_ctrl_c(command.execute::<OpNode>()),
109            #[cfg(feature = "dev")]
110            Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()),
111            Commands::ReExecute(command) => {
112                runner.run_until_ctrl_c(command.execute::<OpNode>(components))
113            }
114        }
115    }
116
117    /// Initializes tracing with the configured options.
118    ///
119    /// If file logging is enabled, this function stores guard to the struct.
120    /// For gRPC OTLP, it requires tokio runtime context.
121    pub fn init_tracing(&mut self, runner: &CliRunner) -> Result<()> {
122        if self.guard.is_none() {
123            let mut layers = self.layers.take().unwrap_or_default();
124
125            #[cfg(feature = "otlp")]
126            {
127                self.cli.traces.validate()?;
128                if let Some(endpoint) = &self.cli.traces.otlp {
129                    info!(target: "reth::cli", "Starting OTLP tracing export to {:?}", endpoint);
130                    self.init_otlp_export(&mut layers, endpoint, runner)?;
131                }
132            }
133
134            self.guard = self.cli.logs.init_tracing_with_layers(layers)?;
135            info!(target: "reth::cli", "Initialized tracing, debug log directory: {}", self.cli.logs.log_file_directory);
136        }
137        Ok(())
138    }
139
140    /// Initialize OTLP tracing export based on protocol type.
141    ///
142    /// For gRPC, `block_on` is required because tonic's channel initialization needs
143    /// a tokio runtime context, even though `with_span_layer` itself is not async.
144    #[cfg(feature = "otlp")]
145    fn init_otlp_export(
146        &self,
147        layers: &mut Layers,
148        endpoint: &Url,
149        runner: &CliRunner,
150    ) -> Result<()> {
151        let endpoint = endpoint.clone();
152        let protocol = self.cli.traces.protocol;
153        let level_filter = self.cli.traces.otlp_filter.clone();
154
155        match protocol {
156            OtlpProtocol::Grpc => {
157                runner.block_on(async {
158                    layers.with_span_layer("reth".to_string(), endpoint, level_filter, protocol)
159                })?;
160            }
161            OtlpProtocol::Http => {
162                layers.with_span_layer("reth".to_string(), endpoint, level_filter, protocol)?;
163            }
164        }
165
166        Ok(())
167    }
168}