reth_cli_runner/
lib.rs

1//! A tokio based CLI runner.
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10
11//! Entrypoint for running commands.
12
13use reth_tasks::{TaskExecutor, TaskManager};
14use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
15use tracing::{debug, error, trace};
16
17/// Executes CLI commands.
18///
19/// Provides utilities for running a cli command to completion.
20#[derive(Debug)]
21pub struct CliRunner {
22    config: CliRunnerConfig,
23    tokio_runtime: tokio::runtime::Runtime,
24}
25
26impl CliRunner {
27    /// Attempts to create a new [`CliRunner`] using the default tokio
28    /// [`Runtime`](tokio::runtime::Runtime).
29    ///
30    /// The default tokio runtime is multi-threaded, with both I/O and time drivers enabled.
31    pub fn try_default_runtime() -> Result<Self, std::io::Error> {
32        Ok(Self { config: CliRunnerConfig::default(), tokio_runtime: tokio_runtime()? })
33    }
34
35    /// Create a new [`CliRunner`] from a provided tokio [`Runtime`](tokio::runtime::Runtime).
36    pub const fn from_runtime(tokio_runtime: tokio::runtime::Runtime) -> Self {
37        Self { config: CliRunnerConfig::new(), tokio_runtime }
38    }
39
40    /// Sets the [`CliRunnerConfig`] for this runner.
41    pub const fn with_config(mut self, config: CliRunnerConfig) -> Self {
42        self.config = config;
43        self
44    }
45
46    /// Executes an async block on the runtime and blocks until completion.
47    pub fn block_on<F, T>(&self, fut: F) -> T
48    where
49        F: Future<Output = T>,
50    {
51        self.tokio_runtime.block_on(fut)
52    }
53
54    /// Executes the given _async_ command on the tokio runtime until the command future resolves or
55    /// until the process receives a `SIGINT` or `SIGTERM` signal.
56    ///
57    /// Tasks spawned by the command via the [`TaskExecutor`] are shut down and an attempt is made
58    /// to drive their shutdown to completion after the command has finished.
59    pub fn run_command_until_exit<F, E>(
60        self,
61        command: impl FnOnce(CliContext) -> F,
62    ) -> Result<(), E>
63    where
64        F: Future<Output = Result<(), E>>,
65        E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
66    {
67        let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
68            AsyncCliRunner::new(self.tokio_runtime);
69
70        // Executes the command until it finished or ctrl-c was fired
71        let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
72            &mut task_manager,
73            run_until_ctrl_c(command(context)),
74        ));
75
76        if command_res.is_err() {
77            error!(target: "reth::cli", "shutting down due to error");
78        } else {
79            debug!(target: "reth::cli", "shutting down gracefully");
80            // after the command has finished or exit signal was received we shutdown the task
81            // manager which fires the shutdown signal to all tasks spawned via the task
82            // executor and awaiting on tasks spawned with graceful shutdown
83            task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
84        }
85
86        // `drop(tokio_runtime)` would block the current thread until its pools
87        // (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop
88        // it on a separate thread and wait for up to 5 seconds for this operation to
89        // complete.
90        let (tx, rx) = mpsc::channel();
91        std::thread::Builder::new()
92            .name("tokio-runtime-shutdown".to_string())
93            .spawn(move || {
94                drop(tokio_runtime);
95                let _ = tx.send(());
96            })
97            .unwrap();
98
99        let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
100            debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
101        });
102
103        command_res
104    }
105
106    /// Executes a command in a blocking context with access to `CliContext`.
107    ///
108    /// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking).
109    pub fn run_blocking_command_until_exit<F, E>(
110        self,
111        command: impl FnOnce(CliContext) -> F + Send + 'static,
112    ) -> Result<(), E>
113    where
114        F: Future<Output = Result<(), E>> + Send + 'static,
115        E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
116    {
117        let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
118            AsyncCliRunner::new(self.tokio_runtime);
119
120        // Spawn the command on the blocking thread pool
121        let handle = tokio_runtime.handle().clone();
122        let command_handle =
123            tokio_runtime.handle().spawn_blocking(move || handle.block_on(command(context)));
124
125        // Wait for the command to complete or ctrl-c
126        let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
127            &mut task_manager,
128            run_until_ctrl_c(
129                async move { command_handle.await.expect("Failed to join blocking task") },
130            ),
131        ));
132
133        if command_res.is_err() {
134            error!(target: "reth::cli", "shutting down due to error");
135        } else {
136            debug!(target: "reth::cli", "shutting down gracefully");
137            task_manager.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
138        }
139
140        // Shutdown the runtime on a separate thread
141        let (tx, rx) = mpsc::channel();
142        std::thread::Builder::new()
143            .name("tokio-runtime-shutdown".to_string())
144            .spawn(move || {
145                drop(tokio_runtime);
146                let _ = tx.send(());
147            })
148            .unwrap();
149
150        let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
151            debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
152        });
153
154        command_res
155    }
156
157    /// Executes a regular future until completion or until external signal received.
158    pub fn run_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
159    where
160        F: Future<Output = Result<(), E>>,
161        E: Send + Sync + From<std::io::Error> + 'static,
162    {
163        self.tokio_runtime.block_on(run_until_ctrl_c(fut))?;
164        Ok(())
165    }
166
167    /// Executes a regular future as a spawned blocking task until completion or until external
168    /// signal received.
169    ///
170    /// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking) .
171    pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
172    where
173        F: Future<Output = Result<(), E>> + Send + 'static,
174        E: Send + Sync + From<std::io::Error> + 'static,
175    {
176        let tokio_runtime = self.tokio_runtime;
177        let handle = tokio_runtime.handle().clone();
178        let fut = tokio_runtime.handle().spawn_blocking(move || handle.block_on(fut));
179        tokio_runtime
180            .block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
181
182        // drop the tokio runtime on a separate thread because drop blocks until its pools
183        // (including blocking pool) are shutdown. In other words `drop(tokio_runtime)` would block
184        // the current thread but we want to exit right away.
185        std::thread::Builder::new()
186            .name("tokio-runtime-shutdown".to_string())
187            .spawn(move || drop(tokio_runtime))
188            .unwrap();
189
190        Ok(())
191    }
192}
193
194/// [`CliRunner`] configuration when executing commands asynchronously
195struct AsyncCliRunner {
196    context: CliContext,
197    task_manager: TaskManager,
198    tokio_runtime: tokio::runtime::Runtime,
199}
200
201// === impl AsyncCliRunner ===
202
203impl AsyncCliRunner {
204    /// Given a tokio [`Runtime`](tokio::runtime::Runtime), creates additional context required to
205    /// execute commands asynchronously.
206    fn new(tokio_runtime: tokio::runtime::Runtime) -> Self {
207        let task_manager = TaskManager::new(tokio_runtime.handle().clone());
208        let task_executor = task_manager.executor();
209        Self { context: CliContext { task_executor }, task_manager, tokio_runtime }
210    }
211}
212
213/// Additional context provided by the [`CliRunner`] when executing commands
214#[derive(Debug)]
215pub struct CliContext {
216    /// Used to execute/spawn tasks
217    pub task_executor: TaskExecutor,
218}
219
220/// Default timeout for graceful shutdown of tasks.
221const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
222
223/// Configuration for [`CliRunner`].
224#[derive(Debug, Clone)]
225pub struct CliRunnerConfig {
226    /// Timeout for graceful shutdown of tasks.
227    ///
228    /// After the command completes, this is the maximum time to wait for spawned tasks
229    /// to finish before forcefully terminating them.
230    pub graceful_shutdown_timeout: Duration,
231}
232
233impl Default for CliRunnerConfig {
234    fn default() -> Self {
235        Self::new()
236    }
237}
238
239impl CliRunnerConfig {
240    /// Creates a new config with default values.
241    pub const fn new() -> Self {
242        Self { graceful_shutdown_timeout: DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT }
243    }
244
245    /// Sets the graceful shutdown timeout.
246    pub const fn with_graceful_shutdown_timeout(mut self, timeout: Duration) -> Self {
247        self.graceful_shutdown_timeout = timeout;
248        self
249    }
250}
251
252/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features
253/// enabled
254pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
255    tokio::runtime::Builder::new_multi_thread().enable_all().build()
256}
257
258/// Runs the given future to completion or until a critical task panicked.
259///
260/// Returns the error if a task panicked, or the given future returned an error.
261async fn run_to_completion_or_panic<F, E>(tasks: &mut TaskManager, fut: F) -> Result<(), E>
262where
263    F: Future<Output = Result<(), E>>,
264    E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
265{
266    {
267        let fut = pin!(fut);
268        tokio::select! {
269            task_manager_result = tasks => {
270                if let Err(panicked_error) = task_manager_result {
271                    return Err(panicked_error.into());
272                }
273            },
274            res = fut => res?,
275        }
276    }
277    Ok(())
278}
279
280/// Runs the future to completion or until:
281/// - `ctrl-c` is received.
282/// - `SIGTERM` is received (unix only).
283async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E>
284where
285    F: Future<Output = Result<(), E>>,
286    E: Send + Sync + 'static + From<std::io::Error>,
287{
288    let ctrl_c = tokio::signal::ctrl_c();
289
290    #[cfg(unix)]
291    {
292        let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
293        let sigterm = stream.recv();
294        let sigterm = pin!(sigterm);
295        let ctrl_c = pin!(ctrl_c);
296        let fut = pin!(fut);
297
298        tokio::select! {
299            _ = ctrl_c => {
300                trace!(target: "reth::cli", "Received ctrl-c");
301            },
302            _ = sigterm => {
303                trace!(target: "reth::cli", "Received SIGTERM");
304            },
305            res = fut => res?,
306        }
307    }
308
309    #[cfg(not(unix))]
310    {
311        let ctrl_c = pin!(ctrl_c);
312        let fut = pin!(fut);
313
314        tokio::select! {
315            _ = ctrl_c => {
316                trace!(target: "reth::cli", "Received ctrl-c");
317            },
318            res = fut => res?,
319        }
320    }
321
322    Ok(())
323}