1#![doc(
4 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
10
11use reth_tasks::{TaskExecutor, TaskManager};
14use std::{future::Future, pin::pin, sync::mpsc, time::Duration};
15use tracing::{debug, error, trace};
16
17#[derive(Debug)]
21#[non_exhaustive]
22pub struct CliRunner {
23 tokio_runtime: tokio::runtime::Runtime,
24}
25
26impl CliRunner {
27 pub fn try_default_runtime() -> Result<Self, std::io::Error> {
32 Ok(Self { tokio_runtime: tokio_runtime()? })
33 }
34
35 pub const fn from_runtime(tokio_runtime: tokio::runtime::Runtime) -> Self {
37 Self { tokio_runtime }
38 }
39}
40
41impl CliRunner {
44 pub fn run_command_until_exit<F, E>(
50 self,
51 command: impl FnOnce(CliContext) -> F,
52 ) -> Result<(), E>
53 where
54 F: Future<Output = Result<(), E>>,
55 E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
56 {
57 let AsyncCliRunner { context, mut task_manager, tokio_runtime } =
58 AsyncCliRunner::new(self.tokio_runtime);
59
60 let command_res = tokio_runtime.block_on(run_to_completion_or_panic(
62 &mut task_manager,
63 run_until_ctrl_c(command(context)),
64 ));
65
66 if command_res.is_err() {
67 error!(target: "reth::cli", "shutting down due to error");
68 } else {
69 debug!(target: "reth::cli", "shutting down gracefully");
70 task_manager.graceful_shutdown_with_timeout(Duration::from_secs(5));
74 }
75
76 let (tx, rx) = mpsc::channel();
81 std::thread::Builder::new()
82 .name("tokio-runtime-shutdown".to_string())
83 .spawn(move || {
84 drop(tokio_runtime);
85 let _ = tx.send(());
86 })
87 .unwrap();
88
89 let _ = rx.recv_timeout(Duration::from_secs(5)).inspect_err(|err| {
90 debug!(target: "reth::cli", %err, "tokio runtime shutdown timed out");
91 });
92
93 command_res
94 }
95
96 pub fn run_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
98 where
99 F: Future<Output = Result<(), E>>,
100 E: Send + Sync + From<std::io::Error> + 'static,
101 {
102 self.tokio_runtime.block_on(run_until_ctrl_c(fut))?;
103 Ok(())
104 }
105
106 pub fn run_blocking_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
111 where
112 F: Future<Output = Result<(), E>> + Send + 'static,
113 E: Send + Sync + From<std::io::Error> + 'static,
114 {
115 let tokio_runtime = self.tokio_runtime;
116 let handle = tokio_runtime.handle().clone();
117 let fut = tokio_runtime.handle().spawn_blocking(move || handle.block_on(fut));
118 tokio_runtime
119 .block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?;
120
121 std::thread::Builder::new()
125 .name("tokio-runtime-shutdown".to_string())
126 .spawn(move || drop(tokio_runtime))
127 .unwrap();
128
129 Ok(())
130 }
131}
132
133struct AsyncCliRunner {
135 context: CliContext,
136 task_manager: TaskManager,
137 tokio_runtime: tokio::runtime::Runtime,
138}
139
140impl AsyncCliRunner {
143 fn new(tokio_runtime: tokio::runtime::Runtime) -> Self {
146 let task_manager = TaskManager::new(tokio_runtime.handle().clone());
147 let task_executor = task_manager.executor();
148 Self { context: CliContext { task_executor }, task_manager, tokio_runtime }
149 }
150}
151
152#[derive(Debug)]
154pub struct CliContext {
155 pub task_executor: TaskExecutor,
157}
158
159pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
162 tokio::runtime::Builder::new_multi_thread().enable_all().build()
163}
164
165async fn run_to_completion_or_panic<F, E>(tasks: &mut TaskManager, fut: F) -> Result<(), E>
169where
170 F: Future<Output = Result<(), E>>,
171 E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
172{
173 {
174 let fut = pin!(fut);
175 tokio::select! {
176 task_manager_result = tasks => {
177 if let Err(panicked_error) = task_manager_result {
178 return Err(panicked_error.into());
179 }
180 },
181 res = fut => res?,
182 }
183 }
184 Ok(())
185}
186
187async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E>
191where
192 F: Future<Output = Result<(), E>>,
193 E: Send + Sync + 'static + From<std::io::Error>,
194{
195 let ctrl_c = tokio::signal::ctrl_c();
196
197 #[cfg(unix)]
198 {
199 let mut stream = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
200 let sigterm = stream.recv();
201 let sigterm = pin!(sigterm);
202 let ctrl_c = pin!(ctrl_c);
203 let fut = pin!(fut);
204
205 tokio::select! {
206 _ = ctrl_c => {
207 trace!(target: "reth::cli", "Received ctrl-c");
208 },
209 _ = sigterm => {
210 trace!(target: "reth::cli", "Received SIGTERM");
211 },
212 res = fut => res?,
213 }
214 }
215
216 #[cfg(not(unix))]
217 {
218 let ctrl_c = pin!(ctrl_c);
219 let fut = pin!(fut);
220
221 tokio::select! {
222 _ = ctrl_c => {
223 trace!(target: "reth::cli", "Received ctrl-c");
224 },
225 res = fut => res?,
226 }
227 }
228
229 Ok(())
230}