reth_cli_runner/lib.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
//! A tokio based CLI runner.
#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
//! Entrypoint for running commands.
use reth_tasks::{TaskExecutor, TaskManager};
use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
use tracing::{debug, error, trace};
/// Executes CLI commands.
///
/// Provides utilities for running a cli command to completion.
#[derive(Clone, Debug, Default)]
#[non_exhaustive]
pub struct CliRunner;
// === impl CliRunner ===
impl CliRunner {
/// Executes the given _async_ command on the tokio runtime until the command future resolves or
/// until the process receives a `SIGINT` or `SIGTERM` signal.
///
/// Tasks spawned by the command via the [`TaskExecutor`] are shut down and an attempt is made
/// to drive their shutdown to completion after the command has finished.
pub fn run_command_until_exit<F, E>(
self,
command: impl FnOnce(CliContext) -> F,
) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
{
let AsyncCliRunner { context, mut task_manager, tokio_runtime } = AsyncCliRunner::new()?;
// Executes the command until it finished or ctrl-c was fired
let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
&mut task_manager,
run_until_ctrl_c(command(context)),
));
if command_res.is_err() {
error!(target: "reth::cli", "shutting down due to error");
} else {
debug!(target: "reth::cli", "shutting down gracefully");
// after the command has finished or exit signal was received we shutdown the task
// manager which fires the shutdown signal to all tasks spawned via the task
// executor and awaiting on tasks spawned with graceful shutdown
task_manager.graceful_shutdown_with_timeout(Duration::from_secs(5));
}
// `drop(tokio_runtime)` would block the current thread until its pools
// (including blocking pool) are shutdown. Since we want to exit as soon as possible, drop
// it on a separate thread and wait for up to 5 seconds for this operation to
// complete.
let (tx, rx) = mpsc::channel();
std::thread::Builder::new()
.name("tokio-runtime-shutdown".to_string())
.spawn(move || {
drop(tokio_runtime);
let _ = tx.send(());
})
.unwrap();
let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
});
command_res
}
/// Executes a regular future until completion or until external signal received.
pub fn run_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<std::io::Error> + 'static,
{
let tokio_runtime = tokio_runtime()?;
tokio_runtime.block_on(run_until_ctrl_c(fut))?;
Ok(())
}
/// Executes a regular future as a spawned blocking task until completion or until external
/// signal received.
///
/// See [`Runtime::spawn_blocking`](tokio::runtime::Runtime::spawn_blocking) .
pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
where
F: Future<Output = Result<(), E>> + Send + 'static,
E: Send + Sync + From<std::io::Error> + 'static,
{
let tokio_runtime = tokio_runtime()?;
let handle = tokio_runtime.handle().clone();
let fut = tokio_runtime.handle().spawn_blocking(move || handle.block_on(fut));
tokio_runtime
.block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
// drop the tokio runtime on a separate thread because drop blocks until its pools
// (including blocking pool) are shutdown. In other words `drop(tokio_runtime)` would block
// the current thread but we want to exit right away.
std::thread::Builder::new()
.name("tokio-runtime-shutdown".to_string())
.spawn(move || drop(tokio_runtime))
.unwrap();
Ok(())
}
}
/// [`CliRunner`] configuration when executing commands asynchronously
struct AsyncCliRunner {
context: CliContext,
task_manager: TaskManager,
tokio_runtime: tokio::runtime::Runtime,
}
// === impl AsyncCliRunner ===
impl AsyncCliRunner {
/// Attempts to create a tokio Runtime and additional context required to execute commands
/// asynchronously.
fn new() -> Result<Self, std::io::Error> {
let tokio_runtime = tokio_runtime()?;
let task_manager = TaskManager::new(tokio_runtime.handle().clone());
let task_executor = task_manager.executor();
Ok(Self { context: CliContext { task_executor }, task_manager, tokio_runtime })
}
}
/// Additional context provided by the [`CliRunner`] when executing commands
#[derive(Debug)]
pub struct CliContext {
/// Used to execute/spawn tasks
pub task_executor: TaskExecutor,
}
/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features
/// enabled
pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
tokio::runtime::Builder::new_multi_thread().enable_all().build()
}
/// Runs the given future to completion or until a critical task panicked.
///
/// Returns the error if a task panicked, or the given future returned an error.
async fn run_to_completion_or_panic<F, E>(tasks: &mut TaskManager, fut: F) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
{
{
let fut = pin!(fut);
tokio::select! {
err = tasks => {
return Err(err.into())
},
res = fut => res?,
}
}
Ok(())
}
/// Runs the future to completion or until:
/// - `ctrl-c` is received.
/// - `SIGTERM` is received (unix only).
async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,
E: Send + Sync + 'static + From<std::io::Error>,
{
let ctrl_c = tokio::signal::ctrl_c();
#[cfg(unix)]
{
let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
let sigterm = stream.recv();
let sigterm = pin!(sigterm);
let ctrl_c = pin!(ctrl_c);
let fut = pin!(fut);
tokio::select! {
_ = ctrl_c => {
trace!(target: "reth::cli", "Received ctrl-c");
},
_ = sigterm => {
trace!(target: "reth::cli", "Received SIGTERM");
},
res = fut => res?,
}
}
#[cfg(not(unix))]
{
let ctrl_c = pin!(ctrl_c);
let fut = pin!(fut);
tokio::select! {
_ = ctrl_c => {
trace!(target: "reth::cli", "Received ctrl-c");
},
res = fut => res?,
}
}
Ok(())
}