Skip to main content

reth_cli_runner/
lib.rs

1//! A tokio based CLI runner.
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10
11//! Entrypoint for running commands.
12
13use reth_tasks::{PanickedTaskError, TaskExecutor};
14use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
15use tokio::task::JoinHandle;
16use tracing::{debug, error, info};
17
18/// Executes CLI commands.
19///
20/// Provides utilities for running a cli command to completion.
21#[derive(Debug)]
22pub struct CliRunner {
23    config: CliRunnerConfig,
24    runtime: reth_tasks::Runtime,
25}
26
27impl CliRunner {
28    /// Attempts to create a new [`CliRunner`] using the default
29    /// [`Runtime`](reth_tasks::Runtime).
30    ///
31    /// The default runtime is multi-threaded, with both I/O and time drivers enabled.
32    pub fn try_default_runtime() -> Result<Self, reth_tasks::RuntimeBuildError> {
33        Self::try_with_runtime_config(reth_tasks::RuntimeConfig::default())
34    }
35
36    /// Creates a new [`CliRunner`] with the given [`RuntimeConfig`](reth_tasks::RuntimeConfig).
37    pub fn try_with_runtime_config(
38        config: reth_tasks::RuntimeConfig,
39    ) -> Result<Self, reth_tasks::RuntimeBuildError> {
40        let runtime = reth_tasks::RuntimeBuilder::new(config).build()?;
41        Ok(Self { config: CliRunnerConfig::default(), runtime })
42    }
43
44    /// Sets the [`CliRunnerConfig`] for this runner.
45    pub const fn with_config(mut self, config: CliRunnerConfig) -> Self {
46        self.config = config;
47        self
48    }
49
50    /// Returns a clone of the underlying [`Runtime`](reth_tasks::Runtime).
51    pub fn runtime(&self) -> reth_tasks::Runtime {
52        self.runtime.clone()
53    }
54
55    /// Executes an async block on the runtime and blocks until completion.
56    pub fn block_on<F, T>(&self, fut: F) -> T
57    where
58        F: Future<Output = T>,
59    {
60        self.runtime.handle().block_on(fut)
61    }
62
63    /// Executes the given _async_ command on the tokio runtime until the command future resolves or
64    /// until the process receives a `SIGINT` or `SIGTERM` signal.
65    ///
66    /// Tasks spawned by the command via the [`TaskExecutor`] are shut down and an attempt is made
67    /// to drive their shutdown to completion after the command has finished.
68    pub fn run_command_until_exit<F, E>(
69        self,
70        command: impl FnOnce(CliContext) -> F,
71    ) -> Result<(), E>
72    where
73        F: Future<Output = Result<(), E>>,
74        E: Send
75            + Sync
76            + std::fmt::Display
77            + From<std::io::Error>
78            + From<reth_tasks::PanickedTaskError>
79            + 'static,
80    {
81        let (context, task_manager_handle) = cli_context(&self.runtime);
82
83        // Executes the command until it finished or ctrl-c was fired
84        let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
85            task_manager_handle,
86            run_until_ctrl_c(command(context)),
87        ));
88
89        if let Err(err) = &command_res {
90            error!(target: "reth::cli", %err, "shutting down due to error");
91        } else {
92            debug!(target: "reth::cli", "shutting down gracefully");
93            // after the command has finished or exit signal was received we shutdown the
94            // runtime which fires the shutdown signal to all tasks spawned via the task
95            // executor and awaiting on tasks spawned with graceful shutdown
96            self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
97        }
98
99        runtime_shutdown(self.runtime, true);
100
101        command_res
102    }
103
104    /// Executes a command in a blocking context with access to `CliContext`.
105    ///
106    /// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking).
107    pub fn run_blocking_command_until_exit<F, E>(
108        self,
109        command: impl FnOnce(CliContext) -> F + Send + 'static,
110    ) -> Result<(), E>
111    where
112        F: Future<Output = Result<(), E>> + Send + 'static,
113        E: Send
114            + Sync
115            + std::fmt::Display
116            + From<std::io::Error>
117            + From<reth_tasks::PanickedTaskError>
118            + 'static,
119    {
120        let (context, task_manager_handle) = cli_context(&self.runtime);
121
122        // Spawn the command on the blocking thread pool
123        let handle = self.runtime.handle().clone();
124        let handle2 = handle.clone();
125        let command_handle = handle.spawn_blocking(move || handle2.block_on(command(context)));
126
127        // Wait for the command to complete or ctrl-c
128        let command_res = self.runtime.handle().block_on(run_to_completion_or_panic(
129            task_manager_handle,
130            run_until_ctrl_c(
131                async move { command_handle.await.expect("Failed to join blocking task") },
132            ),
133        ));
134
135        if let Err(err) = &command_res {
136            error!(target: "reth::cli", %err, "shutting down due to error");
137        } else {
138            debug!(target: "reth::cli", "shutting down gracefully");
139            self.runtime.graceful_shutdown_with_timeout(self.config.graceful_shutdown_timeout);
140        }
141
142        runtime_shutdown(self.runtime, true);
143
144        command_res
145    }
146
147    /// Executes a regular future until completion or until external signal received.
148    pub fn run_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
149    where
150        F: Future<Output = Result<(), E>>,
151        E: Send + Sync + From<std::io::Error> + 'static,
152    {
153        self.runtime.handle().block_on(run_until_ctrl_c(fut))?;
154        Ok(())
155    }
156
157    /// Executes a regular future as a spawned blocking task until completion or until external
158    /// signal received.
159    ///
160    /// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking).
161    pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
162    where
163        F: Future<Output = Result<(), E>> + Send + 'static,
164        E: Send + Sync + From<std::io::Error> + 'static,
165    {
166        let handle = self.runtime.handle().clone();
167        let handle2 = handle.clone();
168        let fut = handle.spawn_blocking(move || handle2.block_on(fut));
169        self.runtime
170            .handle()
171            .block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
172
173        runtime_shutdown(self.runtime, false);
174
175        Ok(())
176    }
177}
178
179/// Extracts the task manager handle from the runtime and creates the [`CliContext`].
180fn cli_context(
181    runtime: &reth_tasks::Runtime,
182) -> (CliContext, JoinHandle<Result<(), PanickedTaskError>>) {
183    let handle =
184        runtime.take_task_manager_handle().expect("Runtime must contain a TaskManager handle");
185    let context = CliContext { task_executor: runtime.clone() };
186    (context, handle)
187}
188
189/// Additional context provided by the [`CliRunner`] when executing commands
190#[derive(Debug)]
191pub struct CliContext {
192    /// Used to execute/spawn tasks
193    pub task_executor: TaskExecutor,
194}
195
196/// Default timeout for graceful shutdown of tasks.
197const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
198
199/// Configuration for [`CliRunner`].
200#[derive(Debug, Clone)]
201pub struct CliRunnerConfig {
202    /// Timeout for graceful shutdown of tasks.
203    ///
204    /// After the command completes, this is the maximum time to wait for spawned tasks
205    /// to finish before forcefully terminating them.
206    pub graceful_shutdown_timeout: Duration,
207}
208
209impl Default for CliRunnerConfig {
210    fn default() -> Self {
211        Self::new()
212    }
213}
214
215impl CliRunnerConfig {
216    /// Creates a new config with default values.
217    pub const fn new() -> Self {
218        Self { graceful_shutdown_timeout: DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT }
219    }
220
221    /// Sets the graceful shutdown timeout.
222    pub const fn with_graceful_shutdown_timeout(mut self, timeout: Duration) -> Self {
223        self.graceful_shutdown_timeout = timeout;
224        self
225    }
226}
227
228/// Runs the given future to completion or until a critical task panicked.
229///
230/// Returns the error if a task panicked, or the given future returned an error.
231async fn run_to_completion_or_panic<F, E>(
232    task_manager_handle: JoinHandle<Result<(), PanickedTaskError>>,
233    fut: F,
234) -> Result<(), E>
235where
236    F: Future<Output = Result<(), E>>,
237    E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
238{
239    let fut = pin!(fut);
240    tokio::select! {
241        task_manager_result = task_manager_handle => {
242            if let Ok(Err(panicked_error)) = task_manager_result {
243                return Err(panicked_error.into());
244            }
245        },
246        res = fut => res?,
247    }
248    Ok(())
249}
250
251/// Runs the future to completion or until:
252/// - `ctrl-c` is received.
253/// - `SIGTERM` is received (unix only).
254async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E>
255where
256    F: Future<Output = Result<(), E>>,
257    E: Send + Sync + 'static + From<std::io::Error>,
258{
259    let ctrl_c = tokio::signal::ctrl_c();
260
261    #[cfg(unix)]
262    {
263        let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
264        let sigterm = stream.recv();
265        let sigterm = pin!(sigterm);
266        let ctrl_c = pin!(ctrl_c);
267        let fut = pin!(fut);
268
269        tokio::select! {
270            _ = ctrl_c => {
271                info!(target: "reth::cli", "Received ctrl-c");
272            },
273            _ = sigterm => {
274                info!(target: "reth::cli", "Received SIGTERM");
275            },
276            res = fut => res?,
277        }
278    }
279
280    #[cfg(not(unix))]
281    {
282        let ctrl_c = pin!(ctrl_c);
283        let fut = pin!(fut);
284
285        tokio::select! {
286            _ = ctrl_c => {
287                info!(target: "reth::cli", "Received ctrl-c");
288            },
289            res = fut => res?,
290        }
291    }
292
293    Ok(())
294}
295
296/// Default timeout for waiting on the tokio runtime to shut down.
297const DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
298
299/// Shut down the given [`Runtime`](reth_tasks::Runtime), and wait for it if `wait` is set.
300///
301/// Dropping the runtime on the current thread could block due to tokio pool teardown.
302/// Instead, we drop it on a separate thread and optionally wait for completion.
303fn runtime_shutdown(rt: reth_tasks::Runtime, wait: bool) {
304    let (tx, rx) = mpsc::channel();
305    std::thread::Builder::new()
306        .name("rt-shutdown".to_string())
307        .spawn(move || {
308            drop(rt);
309            let _ = tx.send(());
310        })
311        .unwrap();
312
313    if wait {
314        let _ = rx.recv_timeout(DEFAULT_RUNTIME_SHUTDOWN_TIMEOUT).inspect_err(|err| {
315            tracing::warn!(target: "reth::cli", %err, "runtime shutdown timed out");
316        });
317    }
318}