Skip to main content

reth_cli_runner/
lib.rs

1//! A tokio based CLI runner.
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10
11//! Entrypoint for running commands.
12
13use reth_tasks::{PanickedTaskError, TaskExecutor};
14use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
15use tokio::task::JoinHandle;
16use tracing::{debug, error, info};
17
18/// Executes CLI commands.
19///
20/// Provides utilities for running a cli command to completion.
21#[derive(Debug)]
22pub struct CliRunner {
23    config: CliRunnerConfig,
24    runtime: reth_tasks::Runtime,
25}
26
27impl CliRunner {
28    /// Attempts to create a new [`CliRunner`] using the default
29    /// [`Runtime`](reth_tasks::Runtime).
30    ///
31    /// The default runtime is multi-threaded, with both I/O and time drivers enabled.
32    pub fn try_default_runtime() -> Result<Self, reth_tasks::RuntimeBuildError> {
33        Self::try_with_runtime_config(reth_tasks::RuntimeConfig::default())
34    }
35
36    /// Creates a new [`CliRunner`] with the given [`RuntimeConfig`](reth_tasks::RuntimeConfig).
37    pub fn try_with_runtime_config(
38        config: reth_tasks::RuntimeConfig,
39    ) -> Result<Self, reth_tasks::RuntimeBuildError> {
40        let runtime = reth_tasks::RuntimeBuilder::new(config).build()?;
41        Ok(Self { config: CliRunnerConfig::default(), runtime })
42    }
43
44    /// Sets the [`CliRunnerConfig`] for this runner.
45    pub const fn with_config(mut self, config: CliRunnerConfig) -> Self {
46        self.config = config;
47        self
48    }
49
50    /// Executes an async block on the runtime and blocks until completion.
51    pub fn block_on<F, T>(&self, fut: F) -> T
52    where
53        F: Future<Output = T>,
54    {
55        self.runtime.handle().block_on(fut)
56    }
57
58    /// Executes the given _async_ command on the tokio runtime until the command future resolves or
59    /// until the process receives a `SIGINT` or `SIGTERM` signal.
60    ///
61    /// Tasks spawned by the command via the [`TaskExecutor`] are shut down and an attempt is made
62    /// to drive their shutdown to completion after the command has finished.
63    pub fn run_command_until_exit<F, E>(
64        self,
65        command: impl FnOnce(CliContext) -> F,
66    ) -> Result<(), E>
67    where
68        F: Future<Output = Result<(), E>>,
69        E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
70    {
71        let (context, task_manager_handle) = cli_context(&self.runtime);
72
73        // Executes the command until it finished or ctrl-c was fired
74        let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
75            task_manager_handle,
76            run_until_ctrl_c(command(context)),
77        ));
78
79        if command_res.is_err() {
80            error!(target: "reth::cli", "shutting down due to error");
81        } else {
82            debug!(target: "reth::cli", "shutting down gracefully");
83            // after the command has finished or exit signal was received we shutdown the
84            // runtime which fires the shutdown signal to all tasks spawned via the task
85            // executor and awaiting on tasks spawned with graceful shutdown
86            self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
87        }
88
89        runtime_shutdown(self.runtime, true);
90
91        command_res
92    }
93
94    /// Executes a command in a blocking context with access to `CliContext`.
95    ///
96    /// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking).
97    pub fn run_blocking_command_until_exit<F, E>(
98        self,
99        command: impl FnOnce(CliContext) -> F + Send + 'static,
100    ) -> Result<(), E>
101    where
102        F: Future<Output = Result<(), E>> + Send + 'static,
103        E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
104    {
105        let (context, task_manager_handle) = cli_context(&self.runtime);
106
107        // Spawn the command on the blocking thread pool
108        let handle = self.runtime.handle().clone();
109        let handle2 = handle.clone();
110        let command_handle = handle.spawn_blocking(move || handle2.block_on(command(context)));
111
112        // Wait for the command to complete or ctrl-c
113        let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
114            task_manager_handle,
115            run_until_ctrl_c(
116                async move { command_handle.await.expect("Failed to join blocking task") },
117            ),
118        ));
119
120        if command_res.is_err() {
121            error!(target: "reth::cli", "shutting down due to error");
122        } else {
123            debug!(target: "reth::cli", "shutting down gracefully");
124            self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
125        }
126
127        runtime_shutdown(self.runtime, true);
128
129        command_res
130    }
131
132    /// Executes a regular future until completion or until external signal received.
133    pub fn run_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
134    where
135        F: Future<Output = Result<(), E>>,
136        E: Send + Sync + From<std::io::Error> + 'static,
137    {
138        self.runtime.handle().block_on(run_until_ctrl_c(fut))?;
139        Ok(())
140    }
141
142    /// Executes a regular future as a spawned blocking task until completion or until external
143    /// signal received.
144    ///
145    /// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking).
146    pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
147    where
148        F: Future<Output = Result<(), E>> + Send + 'static,
149        E: Send + Sync + From<std::io::Error> + 'static,
150    {
151        let handle = self.runtime.handle().clone();
152        let handle2 = handle.clone();
153        let fut = handle.spawn_blocking(move || handle2.block_on(fut));
154        self.runtime
155            .handle()
156            .block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
157
158        runtime_shutdown(self.runtime, false);
159
160        Ok(())
161    }
162}
163
164/// Extracts the task manager handle from the runtime and creates the [`CliContext`].
165fn cli_context(
166    runtime: &reth_tasks::Runtime,
167) -> (CliContext, JoinHandle<Result<(), PanickedTaskError>>) {
168    let handle =
169        runtime.take_task_manager_handle().expect("Runtime must contain a TaskManager handle");
170    let context = CliContext { task_executor: runtime.clone() };
171    (context, handle)
172}
173
174/// Additional context provided by the [`CliRunner`] when executing commands
175#[derive(Debug)]
176pub struct CliContext {
177    /// Used to execute/spawn tasks
178    pub task_executor: TaskExecutor,
179}
180
181/// Default timeout for graceful shutdown of tasks.
182const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
183
184/// Configuration for [`CliRunner`].
185#[derive(Debug, Clone)]
186pub struct CliRunnerConfig {
187    /// Timeout for graceful shutdown of tasks.
188    ///
189    /// After the command completes, this is the maximum time to wait for spawned tasks
190    /// to finish before forcefully terminating them.
191    pub graceful_shutdown_timeout: Duration,
192}
193
194impl Default for CliRunnerConfig {
195    fn default() -> Self {
196        Self::new()
197    }
198}
199
200impl CliRunnerConfig {
201    /// Creates a new config with default values.
202    pub const fn new() -> Self {
203        Self { graceful_shutdown_timeout: DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT }
204    }
205
206    /// Sets the graceful shutdown timeout.
207    pub const fn with_graceful_shutdown_timeout(mut self, timeout: Duration) -> Self {
208        self.graceful_shutdown_timeout = timeout;
209        self
210    }
211}
212
213/// Runs the given future to completion or until a critical task panicked.
214///
215/// Returns the error if a task panicked, or the given future returned an error.
216async fn run_to_completion_or_panic<F, E>(
217    task_manager_handle: JoinHandle<Result<(), PanickedTaskError>>,
218    fut: F,
219) -> Result<(), E>
220where
221    F: Future<Output = Result<(), E>>,
222    E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
223{
224    let fut = pin!(fut);
225    tokio::select! {
226        task_manager_result = task_manager_handle => {
227            if let Ok(Err(panicked_error)) = task_manager_result {
228                return Err(panicked_error.into());
229            }
230        },
231        res = fut => res?,
232    }
233    Ok(())
234}
235
236/// Runs the future to completion or until:
237/// - `ctrl-c` is received.
238/// - `SIGTERM` is received (unix only).
239async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E>
240where
241    F: Future<Output = Result<(), E>>,
242    E: Send + Sync + 'static + From<std::io::Error>,
243{
244    let ctrl_c = tokio::signal::ctrl_c();
245
246    #[cfg(unix)]
247    {
248        let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
249        let sigterm = stream.recv();
250        let sigterm = pin!(sigterm);
251        let ctrl_c = pin!(ctrl_c);
252        let fut = pin!(fut);
253
254        tokio::select! {
255            _ = ctrl_c => {
256                info!(target: "reth::cli", "Received ctrl-c");
257            },
258            _ = sigterm => {
259                info!(target: "reth::cli", "Received SIGTERM");
260            },
261            res = fut => res?,
262        }
263    }
264
265    #[cfg(not(unix))]
266    {
267        let ctrl_c = pin!(ctrl_c);
268        let fut = pin!(fut);
269
270        tokio::select! {
271            _ = ctrl_c => {
272                info!(target: "reth::cli", "Received ctrl-c");
273            },
274            res = fut => res?,
275        }
276    }
277
278    Ok(())
279}
280
281/// Default timeout for waiting on the tokio runtime to shut down.
282const DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
283
284/// Shut down the given [`Runtime`](reth_tasks::Runtime), and wait for it if `wait` is set.
285///
286/// Dropping the runtime on the current thread could block due to tokio pool teardown.
287/// Instead, we drop it on a separate thread and optionally wait for completion.
288fn runtime_shutdown(rt: reth_tasks::Runtime, wait: bool) {
289    let (tx, rx) = mpsc::channel();
290    std::thread::Builder::new()
291        .name("rt-shutdown".to_string())
292        .spawn(move || {
293            drop(rt);
294            let _ = tx.send(());
295        })
296        .unwrap();
297
298    if wait {
299        let _ = rx.recv_timeout(DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT).inspect_err(|err| {
300            tracing::warn!(target: "reth::cli", %err, "runtime shutdown timed out");
301        });
302    }
303}