1#![doc(
4 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10
11use reth_tasks::{PanickedTaskError, TaskExecutor};
14use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
15use tokio::task::JoinHandle;
16use tracing::{debug, error, info};
17
18#[derive(Debug)]
22pub struct CliRunner {
23 config: CliRunnerConfig,
24 runtime: reth_tasks::Runtime,
25}
26
27impl CliRunner {
28 pub fn try_default_runtime() -> Result<Self, reth_tasks::RuntimeBuildError> {
33 Self::try_with_runtime_config(reth_tasks::RuntimeConfig::default())
34 }
35
36 pub fn try_with_runtime_config(
38 config: reth_tasks::RuntimeConfig,
39 ) -> Result<Self, reth_tasks::RuntimeBuildError> {
40 let runtime = reth_tasks::RuntimeBuilder::new(config).build()?;
41 Ok(Self { config: CliRunnerConfig::default(), runtime })
42 }
43
44 pub const fn with_config(mut self, config: CliRunnerConfig) -> Self {
46 self.config = config;
47 self
48 }
49
50 pub fn runtime(&self) -> reth_tasks::Runtime {
52 self.runtime.clone()
53 }
54
55 pub fn block_on<F, T>(&self, fut: F) -> T
57 where
58 F: Future<Output = T>,
59 {
60 self.runtime.handle().block_on(fut)
61 }
62
63 pub fn run_command_until_exit<F, E>(
69 self,
70 command: impl FnOnce(CliContext) -> F,
71 ) -> Result<(), E>
72 where
73 F: Future<Output = Result<(), E>>,
74 E: Send
75 + Sync
76 + std::fmt::Display
77 + From<std::io::Error>
78 + From<reth_tasks::PanickedTaskError>
79 + 'static,
80 {
81 let (context, task_manager_handle) = cli_context(&self.runtime);
82
83 let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
85 task_manager_handle,
86 run_until_ctrl_c(command(context)),
87 ));
88
89 if let Err(err) = &command_res {
90 error!(target: "reth::cli", %err, "shutting down due to error");
91 } else {
92 debug!(target: "reth::cli", "shutting down gracefully");
93 self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
97 }
98
99 runtime_shutdown(self.runtime, true);
100
101 command_res
102 }
103
104 pub fn run_blocking_command_until_exit<F, E>(
108 self,
109 command: impl FnOnce(CliContext) -> F + Send + 'static,
110 ) -> Result<(), E>
111 where
112 F: Future<Output = Result<(), E>> + Send + 'static,
113 E: Send
114 + Sync
115 + std::fmt::Display
116 + From<std::io::Error>
117 + From<reth_tasks::PanickedTaskError>
118 + 'static,
119 {
120 let (context, task_manager_handle) = cli_context(&self.runtime);
121
122 let handle = self.runtime.handle().clone();
124 let handle2 = handle.clone();
125 let command_handle = handle.spawn_blocking(move || handle2.block_on(command(context)));
126
127 let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
129 task_manager_handle,
130 run_until_ctrl_c(
131 async move { command_handle.await.expect("Failed to join blocking task") },
132 ),
133 ));
134
135 if let Err(err) = &command_res {
136 error!(target: "reth::cli", %err, "shutting down due to error");
137 } else {
138 debug!(target: "reth::cli", "shutting down gracefully");
139 self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
140 }
141
142 runtime_shutdown(self.runtime, true);
143
144 command_res
145 }
146
147 pub fn run_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
149 where
150 F: Future<Output = Result<(), E>>,
151 E: Send + Sync + From<std::io::Error> + 'static,
152 {
153 self.runtime.handle().block_on(run_until_ctrl_c(fut))?;
154 Ok(())
155 }
156
157 pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
162 where
163 F: Future<Output = Result<(), E>> + Send + 'static,
164 E: Send + Sync + From<std::io::Error> + 'static,
165 {
166 let handle = self.runtime.handle().clone();
167 let handle2 = handle.clone();
168 let fut = handle.spawn_blocking(move || handle2.block_on(fut));
169 self.runtime
170 .handle()
171 .block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
172
173 runtime_shutdown(self.runtime, false);
174
175 Ok(())
176 }
177}
178
179fn cli_context(
181 runtime: &reth_tasks::Runtime,
182) -> (CliContext, JoinHandle<Result<(), PanickedTaskError>>) {
183 let handle =
184 runtime.take_task_manager_handle().expect("Runtime must contain a TaskManager handle");
185 let context = CliContext { task_executor: runtime.clone() };
186 (context, handle)
187}
188
189#[derive(Debug)]
191pub struct CliContext {
192 pub task_executor: TaskExecutor,
194}
195
196const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
198
199#[derive(Debug, Clone)]
201pub struct CliRunnerConfig {
202 pub graceful_shutdown_timeout: Duration,
207}
208
209impl Default for CliRunnerConfig {
210 fn default() -> Self {
211 Self::new()
212 }
213}
214
215impl CliRunnerConfig {
216 pub const fn new() -> Self {
218 Self { graceful_shutdown_timeout: DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT }
219 }
220
221 pub const fn with_graceful_shutdown_timeout(mut self, timeout: Duration) -> Self {
223 self.graceful_shutdown_timeout = timeout;
224 self
225 }
226}
227
228async fn run_to_completion_or_panic<F, E>(
232 task_manager_handle: JoinHandle<Result<(), PanickedTaskError>>,
233 fut: F,
234) -> Result<(), E>
235where
236 F: Future<Output = Result<(), E>>,
237 E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
238{
239 let fut = pin!(fut);
240 tokio::select! {
241 task_manager_result = task_manager_handle => {
242 if let Ok(Err(panicked_error)) = task_manager_result {
243 return Err(panicked_error.into());
244 }
245 },
246 res = fut => res?,
247 }
248 Ok(())
249}
250
251async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E>
255where
256 F: Future<Output = Result<(), E>>,
257 E: Send + Sync + 'static + From<std::io::Error>,
258{
259 let ctrl_c = tokio::signal::ctrl_c();
260
261 #[cfg(unix)]
262 {
263 let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
264 let sigterm = stream.recv();
265 let sigterm = pin!(sigterm);
266 let ctrl_c = pin!(ctrl_c);
267 let fut = pin!(fut);
268
269 tokio::select! {
270 _ = ctrl_c => {
271 info!(target: "reth::cli", "Received ctrl-c");
272 },
273 _ = sigterm => {
274 info!(target: "reth::cli", "Received SIGTERM");
275 },
276 res = fut => res?,
277 }
278 }
279
280 #[cfg(not(unix))]
281 {
282 let ctrl_c = pin!(ctrl_c);
283 let fut = pin!(fut);
284
285 tokio::select! {
286 _ = ctrl_c => {
287 info!(target: "reth::cli", "Received ctrl-c");
288 },
289 res = fut => res?,
290 }
291 }
292
293 Ok(())
294}
295
296const DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
298
299fn runtime_shutdown(rt: reth_tasks::Runtime, wait: bool) {
304 let (tx, rx) = mpsc::channel();
305 std::thread::Builder::new()
306 .name("rt-shutdown".to_string())
307 .spawn(move || {
308 drop(rt);
309 let _ = tx.send(());
310 })
311 .unwrap();
312
313 if wait {
314 let _ = rx.recv_timeout(DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT).inspect_err(|err| {
315 tracing::warn!(target: "reth::cli", %err, "runtime shutdown timed out");
316 });
317 }
318}