1#![doc(
4 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10
11use reth_tasks::{TaskExecutor, TaskManager};
14use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
15use tracing::{debug, error, trace};
16
17#[derive(Debug)]
21pub struct CliRunner {
22 config: CliRunnerConfig,
23 tokio_runtime: tokio::runtime::Runtime,
24}
25
26impl CliRunner {
27 pub fn try_default_runtime() -> Result<Self, std::io::Error> {
32 Ok(Self { config: CliRunnerConfig::default(), tokio_runtime: tokio_runtime()? })
33 }
34
35 pub const fn from_runtime(tokio_runtime: tokio::runtime::Runtime) -> Self {
37 Self { config: CliRunnerConfig::new(), tokio_runtime }
38 }
39
40 pub const fn with_config(mut self, config: CliRunnerConfig) -> Self {
42 self.config = config;
43 self
44 }
45
46 pub fn block_on<F, T>(&self, fut: F) -> T
48 where
49 F: Future<Output = T>,
50 {
51 self.tokio_runtime.block_on(fut)
52 }
53
54 pub fn run_command_until_exit<F, E>(
60 self,
61 command: impl FnOnce(CliContext) -> F,
62 ) -> Result<(), E>
63 where
64 F: Future<Output = Result<(), E>>,
65 E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
66 {
67 let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
68 AsyncCliRunner::new(self.tokio_runtime);
69
70 let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
72 &mut task_manager,
73 run_until_ctrl_c(command(context)),
74 ));
75
76 if command_res.is_err() {
77 error!(target: "reth::cli", "shutting down due to error");
78 } else {
79 debug!(target: "reth::cli", "shutting down gracefully");
80 task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
84 }
85
86 let (tx, rx) = mpsc::channel();
91 std::thread::Builder::new()
92 .name("tokio-runtime-shutdown".to_string())
93 .spawn(move || {
94 drop(tokio_runtime);
95 let _ = tx.send(());
96 })
97 .unwrap();
98
99 let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
100 debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
101 });
102
103 command_res
104 }
105
106 pub fn run_blocking_command_until_exit<F, E>(
110 self,
111 command: impl FnOnce(CliContext) -> F + Send + 'static,
112 ) -> Result<(), E>
113 where
114 F: Future<Output = Result<(), E>> + Send + 'static,
115 E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
116 {
117 let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
118 AsyncCliRunner::new(self.tokio_runtime);
119
120 let handle = tokio_runtime.handle().clone();
122 let command_handle =
123 tokio_runtime.handle().spawn_blocking(move || handle.block_on(command(context)));
124
125 let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
127 &mut task_manager,
128 run_until_ctrl_c(
129 async move { command_handle.await.expect("Failed to join blocking task") },
130 ),
131 ));
132
133 if command_res.is_err() {
134 error!(target: "reth::cli", "shutting down due to error");
135 } else {
136 debug!(target: "reth::cli", "shutting down gracefully");
137 task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
138 }
139
140 let (tx, rx) = mpsc::channel();
142 std::thread::Builder::new()
143 .name("tokio-runtime-shutdown".to_string())
144 .spawn(move || {
145 drop(tokio_runtime);
146 let _ = tx.send(());
147 })
148 .unwrap();
149
150 let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
151 debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
152 });
153
154 command_res
155 }
156
157 pub fn run_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
159 where
160 F: Future<Output = Result<(), E>>,
161 E: Send + Sync + From<std::io::Error> + 'static,
162 {
163 self.tokio_runtime.block_on(run_until_ctrl_c(fut))?;
164 Ok(())
165 }
166
167 pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
172 where
173 F: Future<Output = Result<(), E>> + Send + 'static,
174 E: Send + Sync + From<std::io::Error> + 'static,
175 {
176 let tokio_runtime = self.tokio_runtime;
177 let handle = tokio_runtime.handle().clone();
178 let fut = tokio_runtime.handle().spawn_blocking(move || handle.block_on(fut));
179 tokio_runtime
180 .block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
181
182 std::thread::Builder::new()
186 .name("tokio-runtime-shutdown".to_string())
187 .spawn(move || drop(tokio_runtime))
188 .unwrap();
189
190 Ok(())
191 }
192}
193
194struct AsyncCliRunner {
196 context: CliContext,
197 task_manager: TaskManager,
198 tokio_runtime: tokio::runtime::Runtime,
199}
200
201impl AsyncCliRunner {
204 fn new(tokio_runtime: tokio::runtime::Runtime) -> Self {
207 let task_manager = TaskManager::new(tokio_runtime.handle().clone());
208 let task_executor = task_manager.executor();
209 Self { context: CliContext { task_executor }, task_manager, tokio_runtime }
210 }
211}
212
213#[derive(Debug)]
215pub struct CliContext {
216 pub task_executor: TaskExecutor,
218}
219
220const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
222
223#[derive(Debug, Clone)]
225pub struct CliRunnerConfig {
226 pub graceful_shutdown_timeout: Duration,
231}
232
233impl Default for CliRunnerConfig {
234 fn default() -> Self {
235 Self::new()
236 }
237}
238
239impl CliRunnerConfig {
240 pub const fn new() -> Self {
242 Self { graceful_shutdown_timeout: DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT }
243 }
244
245 pub const fn with_graceful_shutdown_timeout(mut self, timeout: Duration) -> Self {
247 self.graceful_shutdown_timeout = timeout;
248 self
249 }
250}
251
252pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
255 tokio::runtime::Builder::new_multi_thread().enable_all().build()
256}
257
258async fn run_to_completion_or_panic<F, E>(tasks: &mut TaskManager, fut: F) -> Result<(), E>
262where
263 F: Future<Output = Result<(), E>>,
264 E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
265{
266 {
267 let fut = pin!(fut);
268 tokio::select! {
269 task_manager_result = tasks => {
270 if let Err(panicked_error) = task_manager_result {
271 return Err(panicked_error.into());
272 }
273 },
274 res = fut => res?,
275 }
276 }
277 Ok(())
278}
279
280async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E>
284where
285 F: Future<Output = Result<(), E>>,
286 E: Send + Sync + 'static + From<std::io::Error>,
287{
288 let ctrl_c = tokio::signal::ctrl_c();
289
290 #[cfg(unix)]
291 {
292 let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
293 let sigterm = stream.recv();
294 let sigterm = pin!(sigterm);
295 let ctrl_c = pin!(ctrl_c);
296 let fut = pin!(fut);
297
298 tokio::select! {
299 _ = ctrl_c => {
300 trace!(target: "reth::cli", "Received ctrl-c");
301 },
302 _ = sigterm => {
303 trace!(target: "reth::cli", "Received SIGTERM");
304 },
305 res = fut => res?,
306 }
307 }
308
309 #[cfg(not(unix))]
310 {
311 let ctrl_c = pin!(ctrl_c);
312 let fut = pin!(fut);
313
314 tokio::select! {
315 _ = ctrl_c => {
316 trace!(target: "reth::cli", "Received ctrl-c");
317 },
318 res = fut => res?,
319 }
320 }
321
322 Ok(())
323}