reth_cli_runner/
lib.rs

1//! A tokio based CLI runner.
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
10
11//! Entrypoint for running commands.
12
13use reth_tasks::{TaskExecutor, TaskManager};
14use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
15use tracing::{debug, error, trace};
16
17/// Executes CLI commands.
18///
19/// Provides utilities for running a cli command to completion.
20#[derive(Debug)]
21#[non_exhaustive]
22pub struct CliRunner {
23    tokio_runtime: tokio::runtime::Runtime,
24}
25
26impl CliRunner {
27    /// Attempts to create a new [`CliRunner`] using the default tokio
28    /// [`Runtime`](tokio::runtime::Runtime).
29    ///
30    /// The default tokio runtime is multi-threaded, with both I/O and time drivers enabled.
31    pub fn try_default_runtime() -> Result<Self, std::io::Error> {
32        Ok(Self { tokio_runtime: tokio_runtime()? })
33    }
34
35    /// Create a new [`CliRunner`] from a provided tokio [`Runtime`](tokio::runtime::Runtime).
36    pub const fn from_runtime(tokio_runtime: tokio::runtime::Runtime) -> Self {
37        Self { tokio_runtime }
38    }
39}
40
41// === impl CliRunner ===
42
43impl CliRunner {
44    /// Executes the given _async_ command on the tokio runtime until the command future resolves or
45    /// until the process receives a `SIGINT` or `SIGTERM` signal.
46    ///
47    /// Tasks spawned by the command via the [`TaskExecutor`] are shut down and an attempt is made
48    /// to drive their shutdown to completion after the command has finished.
49    pub fn run_command_until_exit<F, E>(
50        self,
51        command: impl FnOnce(CliContext) -> F,
52    ) -> Result<(), E>
53    where
54        F: Future<Output = Result<(), E>>,
55        E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
56    {
57        let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
58            AsyncCliRunner::new(self.tokio_runtime);
59
60        // Executes the command until it finished or ctrl-c was fired
61        let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
62            &mut task_manager,
63            run_until_ctrl_c(command(context)),
64        ));
65
66        if command_res.is_err() {
67            error!(target: "reth::cli", "shutting down due to error");
68        } else {
69            debug!(target: "reth::cli", "shutting down gracefully");
70            // after the command has finished or exit signal was received we shutdown the task
71            // manager which fires the shutdown signal to all tasks spawned via the task
72            // executor and awaiting on tasks spawned with graceful shutdown
73            task_manager.graceful_shutdown_with_timeout(Duration::from_secs(5));
74        }
75
76        // `drop(tokio_runtime)` would block the current thread until its pools
77        // (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop
78        // it on a separate thread and wait for up to 5 seconds for this operation to
79        // complete.
80        let (tx, rx) = mpsc::channel();
81        std::thread::Builder::new()
82            .name("tokio-runtime-shutdown".to_string())
83            .spawn(move || {
84                drop(tokio_runtime);
85                let _ = tx.send(());
86            })
87            .unwrap();
88
89        let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
90            debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
91        });
92
93        command_res
94    }
95
96    /// Executes a regular future until completion or until external signal received.
97    pub fn run_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
98    where
99        F: Future<Output = Result<(), E>>,
100        E: Send + Sync + From<std::io::Error> + 'static,
101    {
102        self.tokio_runtime.block_on(run_until_ctrl_c(fut))?;
103        Ok(())
104    }
105
106    /// Executes a regular future as a spawned blocking task until completion or until external
107    /// signal received.
108    ///
109    /// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking) .
110    pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
111    where
112        F: Future<Output = Result<(), E>> + Send + 'static,
113        E: Send + Sync + From<std::io::Error> + 'static,
114    {
115        let tokio_runtime = self.tokio_runtime;
116        let handle = tokio_runtime.handle().clone();
117        let fut = tokio_runtime.handle().spawn_blocking(move || handle.block_on(fut));
118        tokio_runtime
119            .block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
120
121        // drop the tokio runtime on a separate thread because drop blocks until its pools
122        // (including blocking pool) are shutdown. In other words `drop(tokio_runtime)` would block
123        // the current thread but we want to exit right away.
124        std::thread::Builder::new()
125            .name("tokio-runtime-shutdown".to_string())
126            .spawn(move || drop(tokio_runtime))
127            .unwrap();
128
129        Ok(())
130    }
131}
132
133/// [`CliRunner`] configuration when executing commands asynchronously
134struct AsyncCliRunner {
135    context: CliContext,
136    task_manager: TaskManager,
137    tokio_runtime: tokio::runtime::Runtime,
138}
139
140// === impl AsyncCliRunner ===
141
142impl AsyncCliRunner {
143    /// Given a tokio [`Runtime`](tokio::runtime::Runtime), creates additional context required to
144    /// execute commands asynchronously.
145    fn new(tokio_runtime: tokio::runtime::Runtime) -> Self {
146        let task_manager = TaskManager::new(tokio_runtime.handle().clone());
147        let task_executor = task_manager.executor();
148        Self { context: CliContext { task_executor }, task_manager, tokio_runtime }
149    }
150}
151
152/// Additional context provided by the [`CliRunner`] when executing commands
153#[derive(Debug)]
154pub struct CliContext {
155    /// Used to execute/spawn tasks
156    pub task_executor: TaskExecutor,
157}
158
159/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features
160/// enabled
161pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
162    tokio::runtime::Builder::new_multi_thread().enable_all().build()
163}
164
165/// Runs the given future to completion or until a critical task panicked.
166///
167/// Returns the error if a task panicked, or the given future returned an error.
168async fn run_to_completion_or_panic<F, E>(tasks: &mut TaskManager, fut: F) -> Result<(), E>
169where
170    F: Future<Output = Result<(), E>>,
171    E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
172{
173    {
174        let fut = pin!(fut);
175        tokio::select! {
176            task_manager_result = tasks => {
177                if let Err(panicked_error) = task_manager_result {
178                    return Err(panicked_error.into());
179                }
180            },
181            res = fut => res?,
182        }
183    }
184    Ok(())
185}
186
187/// Runs the future to completion or until:
188/// - `ctrl-c` is received.
189/// - `SIGTERM` is received (unix only).
190async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E>
191where
192    F: Future<Output = Result<(), E>>,
193    E: Send + Sync + 'static + From<std::io::Error>,
194{
195    let ctrl_c = tokio::signal::ctrl_c();
196
197    #[cfg(unix)]
198    {
199        let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
200        let sigterm = stream.recv();
201        let sigterm = pin!(sigterm);
202        let ctrl_c = pin!(ctrl_c);
203        let fut = pin!(fut);
204
205        tokio::select! {
206            _ = ctrl_c => {
207                trace!(target: "reth::cli", "Received ctrl-c");
208            },
209            _ = sigterm => {
210                trace!(target: "reth::cli", "Received SIGTERM");
211            },
212            res = fut => res?,
213        }
214    }
215
216    #[cfg(not(unix))]
217    {
218        let ctrl_c = pin!(ctrl_c);
219        let fut = pin!(fut);
220
221        tokio::select! {
222            _ = ctrl_c => {
223                trace!(target: "reth::cli", "Received ctrl-c");
224            },
225            res = fut => res?,
226        }
227    }
228
229    Ok(())
230}