Skip to main content

reth_cli_runner/
lib.rs

1//! A tokio based CLI runner.
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10
11//! Entrypoint for running commands.
12
13use reth_tasks::{PanickedTaskError, TaskExecutor};
14use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
15use tokio::task::JoinHandle;
16use tracing::{debug, error, info};
17
18/// Executes CLI commands.
19///
20/// Provides utilities for running a cli command to completion.
21#[derive(Debug)]
22pub struct CliRunner {
23    config: CliRunnerConfig,
24    runtime: reth_tasks::Runtime,
25}
26
27impl CliRunner {
28    /// Attempts to create a new [`CliRunner`] using the default
29    /// [`Runtime`](reth_tasks::Runtime).
30    ///
31    /// The default runtime is multi-threaded, with both I/O and time drivers enabled.
32    pub fn try_default_runtime() -> Result<Self, reth_tasks::RuntimeBuildError> {
33        Self::try_with_runtime_config(reth_tasks::RuntimeConfig::default())
34    }
35
36    /// Creates a new [`CliRunner`] with the given [`RuntimeConfig`](reth_tasks::RuntimeConfig).
37    pub fn try_with_runtime_config(
38        config: reth_tasks::RuntimeConfig,
39    ) -> Result<Self, reth_tasks::RuntimeBuildError> {
40        let runtime = reth_tasks::RuntimeBuilder::new(config).build()?;
41        Ok(Self { config: CliRunnerConfig::default(), runtime })
42    }
43
44    /// Sets the [`CliRunnerConfig`] for this runner.
45    pub const fn with_config(mut self, config: CliRunnerConfig) -> Self {
46        self.config = config;
47        self
48    }
49
50    /// Returns a clone of the underlying [`Runtime`](reth_tasks::Runtime).
51    pub fn runtime(&self) -> reth_tasks::Runtime {
52        self.runtime.clone()
53    }
54
55    /// Executes an async block on the runtime and blocks until completion.
56    pub fn block_on<F, T>(&self, fut: F) -> T
57    where
58        F: Future<Output = T>,
59    {
60        self.runtime.handle().block_on(fut)
61    }
62
63    /// Executes the given _async_ command on the tokio runtime until the command future resolves or
64    /// until the process receives a `SIGINT` or `SIGTERM` signal.
65    ///
66    /// Tasks spawned by the command via the [`TaskExecutor`] are shut down and an attempt is made
67    /// to drive their shutdown to completion after the command has finished.
68    pub fn run_command_until_exit<F, E>(
69        self,
70        command: impl FnOnce(CliContext) -> F,
71    ) -> Result<(), E>
72    where
73        F: Future<Output = Result<(), E>>,
74        E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
75    {
76        let (context, task_manager_handle) = cli_context(&self.runtime);
77
78        // Executes the command until it finished or ctrl-c was fired
79        let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
80            task_manager_handle,
81            run_until_ctrl_c(command(context)),
82        ));
83
84        if command_res.is_err() {
85            error!(target: "reth::cli", "shutting down due to error");
86        } else {
87            debug!(target: "reth::cli", "shutting down gracefully");
88            // after the command has finished or exit signal was received we shutdown the
89            // runtime which fires the shutdown signal to all tasks spawned via the task
90            // executor and awaiting on tasks spawned with graceful shutdown
91            self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
92        }
93
94        runtime_shutdown(self.runtime, true);
95
96        command_res
97    }
98
99    /// Executes a command in a blocking context with access to `CliContext`.
100    ///
101    /// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking).
102    pub fn run_blocking_command_until_exit<F, E>(
103        self,
104        command: impl FnOnce(CliContext) -> F + Send + 'static,
105    ) -> Result<(), E>
106    where
107        F: Future<Output = Result<(), E>> + Send + 'static,
108        E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
109    {
110        let (context, task_manager_handle) = cli_context(&self.runtime);
111
112        // Spawn the command on the blocking thread pool
113        let handle = self.runtime.handle().clone();
114        let handle2 = handle.clone();
115        let command_handle = handle.spawn_blocking(move || handle2.block_on(command(context)));
116
117        // Wait for the command to complete or ctrl-c
118        let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
119            task_manager_handle,
120            run_until_ctrl_c(
121                async move { command_handle.await.expect("Failed to join blocking task") },
122            ),
123        ));
124
125        if command_res.is_err() {
126            error!(target: "reth::cli", "shutting down due to error");
127        } else {
128            debug!(target: "reth::cli", "shutting down gracefully");
129            self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
130        }
131
132        runtime_shutdown(self.runtime, true);
133
134        command_res
135    }
136
137    /// Executes a regular future until completion or until external signal received.
138    pub fn run_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
139    where
140        F: Future<Output = Result<(), E>>,
141        E: Send + Sync + From<std::io::Error> + 'static,
142    {
143        self.runtime.handle().block_on(run_until_ctrl_c(fut))?;
144        Ok(())
145    }
146
147    /// Executes a regular future as a spawned blocking task until completion or until external
148    /// signal received.
149    ///
150    /// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking).
151    pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
152    where
153        F: Future<Output = Result<(), E>> + Send + 'static,
154        E: Send + Sync + From<std::io::Error> + 'static,
155    {
156        let handle = self.runtime.handle().clone();
157        let handle2 = handle.clone();
158        let fut = handle.spawn_blocking(move || handle2.block_on(fut));
159        self.runtime
160            .handle()
161            .block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
162
163        runtime_shutdown(self.runtime, false);
164
165        Ok(())
166    }
167}
168
169/// Extracts the task manager handle from the runtime and creates the [`CliContext`].
170fn cli_context(
171    runtime: &reth_tasks::Runtime,
172) -> (CliContext, JoinHandle<Result<(), PanickedTaskError>>) {
173    let handle =
174        runtime.take_task_manager_handle().expect("Runtime must contain a TaskManager handle");
175    let context = CliContext { task_executor: runtime.clone() };
176    (context, handle)
177}
178
179/// Additional context provided by the [`CliRunner`] when executing commands
180#[derive(Debug)]
181pub struct CliContext {
182    /// Used to execute/spawn tasks
183    pub task_executor: TaskExecutor,
184}
185
186/// Default timeout for graceful shutdown of tasks.
187const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
188
189/// Configuration for [`CliRunner`].
190#[derive(Debug, Clone)]
191pub struct CliRunnerConfig {
192    /// Timeout for graceful shutdown of tasks.
193    ///
194    /// After the command completes, this is the maximum time to wait for spawned tasks
195    /// to finish before forcefully terminating them.
196    pub graceful_shutdown_timeout: Duration,
197}
198
199impl Default for CliRunnerConfig {
200    fn default() -> Self {
201        Self::new()
202    }
203}
204
205impl CliRunnerConfig {
206    /// Creates a new config with default values.
207    pub const fn new() -> Self {
208        Self { graceful_shutdown_timeout: DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT }
209    }
210
211    /// Sets the graceful shutdown timeout.
212    pub const fn with_graceful_shutdown_timeout(mut self, timeout: Duration) -> Self {
213        self.graceful_shutdown_timeout = timeout;
214        self
215    }
216}
217
218/// Runs the given future to completion or until a critical task panicked.
219///
220/// Returns the error if a task panicked, or the given future returned an error.
221async fn run_to_completion_or_panic<F, E>(
222    task_manager_handle: JoinHandle<Result<(), PanickedTaskError>>,
223    fut: F,
224) -> Result<(), E>
225where
226    F: Future<Output = Result<(), E>>,
227    E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
228{
229    let fut = pin!(fut);
230    tokio::select! {
231        task_manager_result = task_manager_handle => {
232            if let Ok(Err(panicked_error)) = task_manager_result {
233                return Err(panicked_error.into());
234            }
235        },
236        res = fut => res?,
237    }
238    Ok(())
239}
240
241/// Runs the future to completion or until:
242/// - `ctrl-c` is received.
243/// - `SIGTERM` is received (unix only).
244async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E>
245where
246    F: Future<Output = Result<(), E>>,
247    E: Send + Sync + 'static + From<std::io::Error>,
248{
249    let ctrl_c = tokio::signal::ctrl_c();
250
251    #[cfg(unix)]
252    {
253        let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
254        let sigterm = stream.recv();
255        let sigterm = pin!(sigterm);
256        let ctrl_c = pin!(ctrl_c);
257        let fut = pin!(fut);
258
259        tokio::select! {
260            _ = ctrl_c => {
261                info!(target: "reth::cli", "Received ctrl-c");
262            },
263            _ = sigterm => {
264                info!(target: "reth::cli", "Received SIGTERM");
265            },
266            res = fut => res?,
267        }
268    }
269
270    #[cfg(not(unix))]
271    {
272        let ctrl_c = pin!(ctrl_c);
273        let fut = pin!(fut);
274
275        tokio::select! {
276            _ = ctrl_c => {
277                info!(target: "reth::cli", "Received ctrl-c");
278            },
279            res = fut => res?,
280        }
281    }
282
283    Ok(())
284}
285
286/// Default timeout for waiting on the tokio runtime to shut down.
287const DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
288
289/// Shut down the given [`Runtime`](reth_tasks::Runtime), and wait for it if `wait` is set.
290///
291/// Dropping the runtime on the current thread could block due to tokio pool teardown.
292/// Instead, we drop it on a separate thread and optionally wait for completion.
293fn runtime_shutdown(rt: reth_tasks::Runtime, wait: bool) {
294    let (tx, rx) = mpsc::channel();
295    std::thread::Builder::new()
296        .name("rt-shutdown".to_string())
297        .spawn(move || {
298            drop(rt);
299            let _ = tx.send(());
300        })
301        .unwrap();
302
303    if wait {
304        let _ = rx.recv_timeout(DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT).inspect_err(|err| {
305            tracing::warn!(target: "reth::cli", %err, "runtime shutdown timed out");
306        });
307    }
308}