1#![doc(
4 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10
11use reth_tasks::{PanickedTaskError, TaskExecutor};
14use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
15use tokio::task::JoinHandle;
16use tracing::{debug, error, info};
17
18#[derive(Debug)]
22pub struct CliRunner {
23 config: CliRunnerConfig,
24 runtime: reth_tasks::Runtime,
25}
26
27impl CliRunner {
28 pub fn try_default_runtime() -> Result<Self, reth_tasks::RuntimeBuildError> {
33 Self::try_with_runtime_config(reth_tasks::RuntimeConfig::default())
34 }
35
36 pub fn try_with_runtime_config(
38 config: reth_tasks::RuntimeConfig,
39 ) -> Result<Self, reth_tasks::RuntimeBuildError> {
40 let runtime = reth_tasks::RuntimeBuilder::new(config).build()?;
41 Ok(Self { config: CliRunnerConfig::default(), runtime })
42 }
43
44 pub const fn with_config(mut self, config: CliRunnerConfig) -> Self {
46 self.config = config;
47 self
48 }
49
50 pub fn block_on<F, T>(&self, fut: F) -> T
52 where
53 F: Future<Output = T>,
54 {
55 self.runtime.handle().block_on(fut)
56 }
57
58 pub fn run_command_until_exit<F, E>(
64 self,
65 command: impl FnOnce(CliContext) -> F,
66 ) -> Result<(), E>
67 where
68 F: Future<Output = Result<(), E>>,
69 E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
70 {
71 let (context, task_manager_handle) = cli_context(&self.runtime);
72
73 let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
75 task_manager_handle,
76 run_until_ctrl_c(command(context)),
77 ));
78
79 if command_res.is_err() {
80 error!(target: "reth::cli", "shutting down due to error");
81 } else {
82 debug!(target: "reth::cli", "shutting down gracefully");
83 self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
87 }
88
89 runtime_shutdown(self.runtime, true);
90
91 command_res
92 }
93
94 pub fn run_blocking_command_until_exit<F, E>(
98 self,
99 command: impl FnOnce(CliContext) -> F + Send + 'static,
100 ) -> Result<(), E>
101 where
102 F: Future<Output = Result<(), E>> + Send + 'static,
103 E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
104 {
105 let (context, task_manager_handle) = cli_context(&self.runtime);
106
107 let handle = self.runtime.handle().clone();
109 let handle2 = handle.clone();
110 let command_handle = handle.spawn_blocking(move || handle2.block_on(command(context)));
111
112 let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
114 task_manager_handle,
115 run_until_ctrl_c(
116 async move { command_handle.await.expect("Failed to join blocking task") },
117 ),
118 ));
119
120 if command_res.is_err() {
121 error!(target: "reth::cli", "shutting down due to error");
122 } else {
123 debug!(target: "reth::cli", "shutting down gracefully");
124 self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
125 }
126
127 runtime_shutdown(self.runtime, true);
128
129 command_res
130 }
131
132 pub fn run_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
134 where
135 F: Future<Output = Result<(), E>>,
136 E: Send + Sync + From<std::io::Error> + 'static,
137 {
138 self.runtime.handle().block_on(run_until_ctrl_c(fut))?;
139 Ok(())
140 }
141
142 pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
147 where
148 F: Future<Output = Result<(), E>> + Send + 'static,
149 E: Send + Sync + From<std::io::Error> + 'static,
150 {
151 let handle = self.runtime.handle().clone();
152 let handle2 = handle.clone();
153 let fut = handle.spawn_blocking(move || handle2.block_on(fut));
154 self.runtime
155 .handle()
156 .block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
157
158 runtime_shutdown(self.runtime, false);
159
160 Ok(())
161 }
162}
163
164fn cli_context(
166 runtime: &reth_tasks::Runtime,
167) -> (CliContext, JoinHandle<Result<(), PanickedTaskError>>) {
168 let handle =
169 runtime.take_task_manager_handle().expect("Runtime must contain a TaskManager handle");
170 let context = CliContext { task_executor: runtime.clone() };
171 (context, handle)
172}
173
174#[derive(Debug)]
176pub struct CliContext {
177 pub task_executor: TaskExecutor,
179}
180
181const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
183
184#[derive(Debug, Clone)]
186pub struct CliRunnerConfig {
187 pub graceful_shutdown_timeout: Duration,
192}
193
194impl Default for CliRunnerConfig {
195 fn default() -> Self {
196 Self::new()
197 }
198}
199
200impl CliRunnerConfig {
201 pub const fn new() -> Self {
203 Self { graceful_shutdown_timeout: DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT }
204 }
205
206 pub const fn with_graceful_shutdown_timeout(mut self, timeout: Duration) -> Self {
208 self.graceful_shutdown_timeout = timeout;
209 self
210 }
211}
212
213async fn run_to_completion_or_panic<F, E>(
217 task_manager_handle: JoinHandle<Result<(), PanickedTaskError>>,
218 fut: F,
219) -> Result<(), E>
220where
221 F: Future<Output = Result<(), E>>,
222 E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
223{
224 let fut = pin!(fut);
225 tokio::select! {
226 task_manager_result = task_manager_handle => {
227 if let Ok(Err(panicked_error)) = task_manager_result {
228 return Err(panicked_error.into());
229 }
230 },
231 res = fut => res?,
232 }
233 Ok(())
234}
235
236async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E>
240where
241 F: Future<Output = Result<(), E>>,
242 E: Send + Sync + 'static + From<std::io::Error>,
243{
244 let ctrl_c = tokio::signal::ctrl_c();
245
246 #[cfg(unix)]
247 {
248 let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
249 let sigterm = stream.recv();
250 let sigterm = pin!(sigterm);
251 let ctrl_c = pin!(ctrl_c);
252 let fut = pin!(fut);
253
254 tokio::select! {
255 _ = ctrl_c => {
256 info!(target: "reth::cli", "Received ctrl-c");
257 },
258 _ = sigterm => {
259 info!(target: "reth::cli", "Received SIGTERM");
260 },
261 res = fut => res?,
262 }
263 }
264
265 #[cfg(not(unix))]
266 {
267 let ctrl_c = pin!(ctrl_c);
268 let fut = pin!(fut);
269
270 tokio::select! {
271 _ = ctrl_c => {
272 info!(target: "reth::cli", "Received ctrl-c");
273 },
274 res = fut => res?,
275 }
276 }
277
278 Ok(())
279}
280
281const DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
283
284fn runtime_shutdown(rt: reth_tasks::Runtime, wait: bool) {
289 let (tx, rx) = mpsc::channel();
290 std::thread::Builder::new()
291 .name("rt-shutdown".to_string())
292 .spawn(move || {
293 drop(rt);
294 let _ = tx.send(());
295 })
296 .unwrap();
297
298 if wait {
299 let _ = rx.recv_timeout(DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT).inspect_err(|err| {
300 tracing::warn!(target: "reth::cli", %err, "runtime shutdown timed out");
301 });
302 }
303}