1#![doc(
4 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10
11use reth_tasks::{PanickedTaskError, TaskExecutor};
14use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
15use tokio::task::JoinHandle;
16use tracing::{debug, error, info};
17
18#[derive(Debug)]
22pub struct CliRunner {
23 config: CliRunnerConfig,
24 runtime: reth_tasks::Runtime,
25}
26
27impl CliRunner {
28 pub fn try_default_runtime() -> Result<Self, reth_tasks::RuntimeBuildError> {
33 Self::try_with_runtime_config(reth_tasks::RuntimeConfig::default())
34 }
35
36 pub fn try_with_runtime_config(
38 config: reth_tasks::RuntimeConfig,
39 ) -> Result<Self, reth_tasks::RuntimeBuildError> {
40 let runtime = reth_tasks::RuntimeBuilder::new(config).build()?;
41 Ok(Self { config: CliRunnerConfig::default(), runtime })
42 }
43
44 pub const fn with_config(mut self, config: CliRunnerConfig) -> Self {
46 self.config = config;
47 self
48 }
49
50 pub fn runtime(&self) -> reth_tasks::Runtime {
52 self.runtime.clone()
53 }
54
55 pub fn block_on<F, T>(&self, fut: F) -> T
57 where
58 F: Future<Output = T>,
59 {
60 self.runtime.handle().block_on(fut)
61 }
62
63 pub fn run_command_until_exit<F, E>(
69 self,
70 command: impl FnOnce(CliContext) -> F,
71 ) -> Result<(), E>
72 where
73 F: Future<Output = Result<(), E>>,
74 E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
75 {
76 let (context, task_manager_handle) = cli_context(&self.runtime);
77
78 let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
80 task_manager_handle,
81 run_until_ctrl_c(command(context)),
82 ));
83
84 if command_res.is_err() {
85 error!(target: "reth::cli", "shutting down due to error");
86 } else {
87 debug!(target: "reth::cli", "shutting down gracefully");
88 self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
92 }
93
94 runtime_shutdown(self.runtime, true);
95
96 command_res
97 }
98
99 pub fn run_blocking_command_until_exit<F, E>(
103 self,
104 command: impl FnOnce(CliContext) -> F + Send + 'static,
105 ) -> Result<(), E>
106 where
107 F: Future<Output = Result<(), E>> + Send + 'static,
108 E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
109 {
110 let (context, task_manager_handle) = cli_context(&self.runtime);
111
112 let handle = self.runtime.handle().clone();
114 let handle2 = handle.clone();
115 let command_handle = handle.spawn_blocking(move || handle2.block_on(command(context)));
116
117 let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
119 task_manager_handle,
120 run_until_ctrl_c(
121 async move { command_handle.await.expect("Failed to join blocking task") },
122 ),
123 ));
124
125 if command_res.is_err() {
126 error!(target: "reth::cli", "shutting down due to error");
127 } else {
128 debug!(target: "reth::cli", "shutting down gracefully");
129 self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
130 }
131
132 runtime_shutdown(self.runtime, true);
133
134 command_res
135 }
136
137 pub fn run_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
139 where
140 F: Future<Output = Result<(), E>>,
141 E: Send + Sync + From<std::io::Error> + 'static,
142 {
143 self.runtime.handle().block_on(run_until_ctrl_c(fut))?;
144 Ok(())
145 }
146
147 pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
152 where
153 F: Future<Output = Result<(), E>> + Send + 'static,
154 E: Send + Sync + From<std::io::Error> + 'static,
155 {
156 let handle = self.runtime.handle().clone();
157 let handle2 = handle.clone();
158 let fut = handle.spawn_blocking(move || handle2.block_on(fut));
159 self.runtime
160 .handle()
161 .block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
162
163 runtime_shutdown(self.runtime, false);
164
165 Ok(())
166 }
167}
168
169fn cli_context(
171 runtime: &reth_tasks::Runtime,
172) -> (CliContext, JoinHandle<Result<(), PanickedTaskError>>) {
173 let handle =
174 runtime.take_task_manager_handle().expect("Runtime must contain a TaskManager handle");
175 let context = CliContext { task_executor: runtime.clone() };
176 (context, handle)
177}
178
179#[derive(Debug)]
181pub struct CliContext {
182 pub task_executor: TaskExecutor,
184}
185
186const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
188
189#[derive(Debug, Clone)]
191pub struct CliRunnerConfig {
192 pub graceful_shutdown_timeout: Duration,
197}
198
199impl Default for CliRunnerConfig {
200 fn default() -> Self {
201 Self::new()
202 }
203}
204
205impl CliRunnerConfig {
206 pub const fn new() -> Self {
208 Self { graceful_shutdown_timeout: DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT }
209 }
210
211 pub const fn with_graceful_shutdown_timeout(mut self, timeout: Duration) -> Self {
213 self.graceful_shutdown_timeout = timeout;
214 self
215 }
216}
217
218async fn run_to_completion_or_panic<F, E>(
222 task_manager_handle: JoinHandle<Result<(), PanickedTaskError>>,
223 fut: F,
224) -> Result<(), E>
225where
226 F: Future<Output = Result<(), E>>,
227 E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
228{
229 let fut = pin!(fut);
230 tokio::select! {
231 task_manager_result = task_manager_handle => {
232 if let Ok(Err(panicked_error)) = task_manager_result {
233 return Err(panicked_error.into());
234 }
235 },
236 res = fut => res?,
237 }
238 Ok(())
239}
240
241async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E>
245where
246 F: Future<Output = Result<(), E>>,
247 E: Send + Sync + 'static + From<std::io::Error>,
248{
249 let ctrl_c = tokio::signal::ctrl_c();
250
251 #[cfg(unix)]
252 {
253 let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
254 let sigterm = stream.recv();
255 let sigterm = pin!(sigterm);
256 let ctrl_c = pin!(ctrl_c);
257 let fut = pin!(fut);
258
259 tokio::select! {
260 _ = ctrl_c => {
261 info!(target: "reth::cli", "Received ctrl-c");
262 },
263 _ = sigterm => {
264 info!(target: "reth::cli", "Received SIGTERM");
265 },
266 res = fut => res?,
267 }
268 }
269
270 #[cfg(not(unix))]
271 {
272 let ctrl_c = pin!(ctrl_c);
273 let fut = pin!(fut);
274
275 tokio::select! {
276 _ = ctrl_c => {
277 info!(target: "reth::cli", "Received ctrl-c");
278 },
279 res = fut => res?,
280 }
281 }
282
283 Ok(())
284}
285
286const DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
288
289fn runtime_shutdown(rt: reth_tasks::Runtime, wait: bool) {
294 let (tx, rx) = mpsc::channel();
295 std::thread::Builder::new()
296 .name("rt-shutdown".to_string())
297 .spawn(move || {
298 drop(rt);
299 let _ = tx.send(());
300 })
301 .unwrap();
302
303 if wait {
304 let _ = rx.recv_timeout(DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT).inspect_err(|err| {
305 tracing::warn!(target: "reth::cli", %err, "runtime shutdown timed out");
306 });
307 }
308}