reth_cli_runner/
lib.rs
1#![doc(
4 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
10
11use reth_tasks::{TaskExecutor, TaskManager};
14use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
15use tracing::{debug, error, trace};
16
17#[derive(Debug)]
21#[non_exhaustive]
22pub struct CliRunner {
23 tokio_runtime: tokio::runtime::Runtime,
24}
25
26impl CliRunner {
27 pub fn try_default_runtime() -> Result<Self, std::io::Error> {
32 Ok(Self { tokio_runtime: tokio_runtime()? })
33 }
34
35 pub fn from_runtime(tokio_runtime: tokio::runtime::Runtime) -> Self {
37 Self { tokio_runtime }
38 }
39}
40
41impl CliRunner {
44 pub fn run_command_until_exit<F, E>(
50 self,
51 command: impl FnOnce(CliContext) -> F,
52 ) -> Result<(), E>
53 where
54 F: Future<Output = Result<(), E>>,
55 E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
56 {
57 let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
58 AsyncCliRunner::new(self.tokio_runtime);
59
60 let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
62 &mut task_manager,
63 run_until_ctrl_c(command(context)),
64 ));
65
66 if command_res.is_err() {
67 error!(target: "reth::cli", "shutting down due to error");
68 } else {
69 debug!(target: "reth::cli", "shutting down gracefully");
70 task_manager.graceful_shutdown_with_timeout(Duration::from_secs(5));
74 }
75
76 let (tx, rx) = mpsc::channel();
81 std::thread::Builder::new()
82 .name("tokio-runtime-shutdown".to_string())
83 .spawn(move || {
84 drop(tokio_runtime);
85 let _ = tx.send(());
86 })
87 .unwrap();
88
89 let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
90 debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
91 });
92
93 command_res
94 }
95
96 pub fn run_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
98 where
99 F: Future<Output = Result<(), E>>,
100 E: Send + Sync + From<std::io::Error> + 'static,
101 {
102 let tokio_runtime = tokio_runtime()?;
103 tokio_runtime.block_on(run_until_ctrl_c(fut))?;
104 Ok(())
105 }
106
107 pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
112 where
113 F: Future<Output = Result<(), E>> + Send + 'static,
114 E: Send + Sync + From<std::io::Error> + 'static,
115 {
116 let tokio_runtime = tokio_runtime()?;
117 let handle = tokio_runtime.handle().clone();
118 let fut = tokio_runtime.handle().spawn_blocking(move || handle.block_on(fut));
119 tokio_runtime
120 .block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
121
122 std::thread::Builder::new()
126 .name("tokio-runtime-shutdown".to_string())
127 .spawn(move || drop(tokio_runtime))
128 .unwrap();
129
130 Ok(())
131 }
132}
133
134struct AsyncCliRunner {
136 context: CliContext,
137 task_manager: TaskManager,
138 tokio_runtime: tokio::runtime::Runtime,
139}
140
141impl AsyncCliRunner {
144 fn new(tokio_runtime: tokio::runtime::Runtime) -> Self {
147 let task_manager = TaskManager::new(tokio_runtime.handle().clone());
148 let task_executor = task_manager.executor();
149 Self { context: CliContext { task_executor }, task_manager, tokio_runtime }
150 }
151}
152
153#[derive(Debug)]
155pub struct CliContext {
156 pub task_executor: TaskExecutor,
158}
159
160pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
163 tokio::runtime::Builder::new_multi_thread().enable_all().build()
164}
165
166async fn run_to_completion_or_panic<F, E>(tasks: &mut TaskManager, fut: F) -> Result<(), E>
170where
171 F: Future<Output = Result<(), E>>,
172 E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
173{
174 {
175 let fut = pin!(fut);
176 tokio::select! {
177 err = tasks => {
178 return Err(err.into())
179 },
180 res = fut => res?,
181 }
182 }
183 Ok(())
184}
185
186async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E>
190where
191 F: Future<Output = Result<(), E>>,
192 E: Send + Sync + 'static + From<std::io::Error>,
193{
194 let ctrl_c = tokio::signal::ctrl_c();
195
196 #[cfg(unix)]
197 {
198 let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
199 let sigterm = stream.recv();
200 let sigterm = pin!(sigterm);
201 let ctrl_c = pin!(ctrl_c);
202 let fut = pin!(fut);
203
204 tokio::select! {
205 _ = ctrl_c => {
206 trace!(target: "reth::cli", "Received ctrl-c");
207 },
208 _ = sigterm => {
209 trace!(target: "reth::cli", "Received SIGTERM");
210 },
211 res = fut => res?,
212 }
213 }
214
215 #[cfg(not(unix))]
216 {
217 let ctrl_c = pin!(ctrl_c);
218 let fut = pin!(fut);
219
220 tokio::select! {
221 _ = ctrl_c => {
222 trace!(target: "reth::cli", "Received ctrl-c");
223 },
224 res = fut => res?,
225 }
226 }
227
228 Ok(())
229}