1use crate::{Cli, Commands};
2use eyre::{eyre, Result};
3use reth_cli::chainspec::ChainSpecParser;
4use reth_cli_commands::launcher::Launcher;
5use reth_cli_runner::CliRunner;
6use reth_node_metrics::recorder::install_prometheus_recorder;
7use reth_optimism_chainspec::OpChainSpec;
8use reth_optimism_consensus::OpBeaconConsensus;
9use reth_optimism_node::{OpExecutorProvider, OpNode};
10use reth_rpc_server_types::RpcModuleValidator;
11use reth_tracing::{FileWorkerGuard, Layers};
12use reth_tracing_otlp::OtlpProtocol;
13use std::{fmt, sync::Arc};
14use tracing::info;
15use url::Url;
16
17#[derive(Debug)]
19pub struct CliApp<Spec: ChainSpecParser, Ext: clap::Args + fmt::Debug, Rpc: RpcModuleValidator> {
20 cli: Cli<Spec, Ext, Rpc>,
21 runner: Option<CliRunner>,
22 layers: Option<Layers>,
23 guard: Option<FileWorkerGuard>,
24}
25
26impl<C, Ext, Rpc> CliApp<C, Ext, Rpc>
27where
28 C: ChainSpecParser<ChainSpec = OpChainSpec>,
29 Ext: clap::Args + fmt::Debug,
30 Rpc: RpcModuleValidator,
31{
32 pub(crate) fn new(cli: Cli<C, Ext, Rpc>) -> Self {
33 Self { cli, runner: None, layers: Some(Layers::new()), guard: None }
34 }
35
36 pub fn set_runner(&mut self, runner: CliRunner) {
40 self.runner = Some(runner);
41 }
42
43 pub fn access_tracing_layers(&mut self) -> Result<&mut Layers> {
48 self.layers.as_mut().ok_or_else(|| eyre!("Tracing already initialized"))
49 }
50
51 pub fn run(mut self, launcher: impl Launcher<C, Ext>) -> Result<()> {
56 let runner = match self.runner.take() {
57 Some(runner) => runner,
58 None => CliRunner::try_default_runtime()?,
59 };
60
61 if let Some(chain_spec) = self.cli.command.chain_spec() {
64 self.cli.logs.log_file_directory =
65 self.cli.logs.log_file_directory.join(chain_spec.chain.to_string());
66 }
67
68 self.init_tracing(&runner)?;
69
70 let _ = install_prometheus_recorder();
72
73 let components = |spec: Arc<OpChainSpec>| {
74 (OpExecutorProvider::optimism(spec.clone()), Arc::new(OpBeaconConsensus::new(spec)))
75 };
76
77 match self.cli.command {
78 Commands::Node(command) => {
79 if let Some(http_api) = &command.rpc.http_api {
81 Rpc::validate_selection(http_api, "http.api").map_err(|e| eyre!("{e}"))?;
82 }
83 if let Some(ws_api) = &command.rpc.ws_api {
84 Rpc::validate_selection(ws_api, "ws.api").map_err(|e| eyre!("{e}"))?;
85 }
86
87 runner.run_command_until_exit(|ctx| command.execute(ctx, launcher))
88 }
89 Commands::Init(command) => {
90 runner.run_blocking_until_ctrl_c(command.execute::<OpNode>())
91 }
92 Commands::InitState(command) => {
93 runner.run_blocking_until_ctrl_c(command.execute::<OpNode>())
94 }
95 Commands::ImportOp(command) => {
96 runner.run_blocking_until_ctrl_c(command.execute::<OpNode>())
97 }
98 Commands::ImportReceiptsOp(command) => {
99 runner.run_blocking_until_ctrl_c(command.execute::<OpNode>())
100 }
101 Commands::DumpGenesis(command) => runner.run_blocking_until_ctrl_c(command.execute()),
102 Commands::Db(command) => runner.run_blocking_until_ctrl_c(command.execute::<OpNode>()),
103 Commands::Stage(command) => {
104 runner.run_command_until_exit(|ctx| command.execute::<OpNode, _>(ctx, components))
105 }
106 Commands::P2P(command) => runner.run_until_ctrl_c(command.execute::<OpNode>()),
107 Commands::Config(command) => runner.run_until_ctrl_c(command.execute()),
108 Commands::Prune(command) => runner.run_until_ctrl_c(command.execute::<OpNode>()),
109 #[cfg(feature = "dev")]
110 Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()),
111 Commands::ReExecute(command) => {
112 runner.run_until_ctrl_c(command.execute::<OpNode>(components))
113 }
114 }
115 }
116
117 pub fn init_tracing(&mut self, runner: &CliRunner) -> Result<()> {
122 if self.guard.is_none() {
123 let mut layers = self.layers.take().unwrap_or_default();
124
125 #[cfg(feature = "otlp")]
126 {
127 self.cli.traces.validate()?;
128 if let Some(endpoint) = &self.cli.traces.otlp {
129 info!(target: "reth::cli", "Starting OTLP tracing export to {:?}", endpoint);
130 self.init_otlp_export(&mut layers, endpoint, runner)?;
131 }
132 }
133
134 self.guard = self.cli.logs.init_tracing_with_layers(layers)?;
135 info!(target: "reth::cli", "Initialized tracing, debug log directory: {}", self.cli.logs.log_file_directory);
136 }
137 Ok(())
138 }
139
140 #[cfg(feature = "otlp")]
145 fn init_otlp_export(
146 &self,
147 layers: &mut Layers,
148 endpoint: &Url,
149 runner: &CliRunner,
150 ) -> Result<()> {
151 let endpoint = endpoint.clone();
152 let protocol = self.cli.traces.protocol;
153 let level_filter = self.cli.traces.otlp_filter.clone();
154
155 match protocol {
156 OtlpProtocol::Grpc => {
157 runner.block_on(async {
158 layers.with_span_layer("reth".to_string(), endpoint, level_filter, protocol)
159 })?;
160 }
161 OtlpProtocol::Http => {
162 layers.with_span_layer("reth".to_string(), endpoint, level_filter, protocol)?;
163 }
164 }
165
166 Ok(())
167 }
168}