1#[cfg(feature = "rayon")]
10use crate::pool::{BlockingTaskGuard, BlockingTaskPool, WorkerPool};
11use crate::{
12 metrics::{IncCounterOnDrop, TaskExecutorMetrics},
13 shutdown::{GracefulShutdown, GracefulShutdownGuard, Shutdown},
14 worker_map::WorkerMap,
15 PanickedTaskError, TaskEvent, TaskManager,
16};
17use futures_util::{future::select, Future, FutureExt, TryFutureExt};
18#[cfg(feature = "rayon")]
19use std::{num::NonZeroUsize, thread::available_parallelism};
20use std::{
21 pin::pin,
22 sync::{
23 atomic::{AtomicUsize, Ordering},
24 Arc, Mutex,
25 },
26 time::{Duration, Instant},
27};
28use tokio::{runtime::Handle, sync::mpsc::UnboundedSender, task::JoinHandle};
29use tracing::{debug, error};
30use tracing_futures::Instrument;
31
32use tokio::runtime::Runtime as TokioRuntime;
33
34pub const DEFAULT_THREAD_KEEP_ALIVE: Duration = Duration::from_secs(15);
36
37pub const DEFAULT_RESERVED_CPU_CORES: usize = 2;
39
40pub const DEFAULT_STORAGE_POOL_THREADS: usize = 16;
42
43pub const DEFAULT_MAX_BLOCKING_TASKS: usize = 512;
45
46#[derive(Debug, Clone)]
48pub enum TokioConfig {
49 Owned {
51 worker_threads: Option<usize>,
53 thread_keep_alive: Duration,
55 thread_name: &'static str,
57 },
58 ExistingHandle(Handle),
60}
61
62impl Default for TokioConfig {
63 fn default() -> Self {
64 Self::Owned {
65 worker_threads: None,
66 thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
67 thread_name: "tokio-rt",
68 }
69 }
70}
71
72impl TokioConfig {
73 pub const fn existing_handle(handle: Handle) -> Self {
75 Self::ExistingHandle(handle)
76 }
77
78 pub const fn with_worker_threads(worker_threads: usize) -> Self {
80 Self::Owned {
81 worker_threads: Some(worker_threads),
82 thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
83 thread_name: "tokio-rt",
84 }
85 }
86}
87
88#[derive(Debug, Clone)]
90#[cfg(feature = "rayon")]
91pub struct RayonConfig {
92 pub cpu_threads: Option<usize>,
95 pub reserved_cpu_cores: usize,
97 pub rpc_threads: Option<usize>,
100 pub storage_threads: Option<usize>,
103 pub max_blocking_tasks: usize,
105 pub proof_storage_worker_threads: Option<usize>,
108 pub proof_account_worker_threads: Option<usize>,
111 pub prewarming_threads: Option<usize>,
114}
115
116#[cfg(feature = "rayon")]
117impl Default for RayonConfig {
118 fn default() -> Self {
119 Self {
120 cpu_threads: None,
121 reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
122 rpc_threads: None,
123 storage_threads: None,
124 max_blocking_tasks: DEFAULT_MAX_BLOCKING_TASKS,
125 proof_storage_worker_threads: None,
126 proof_account_worker_threads: None,
127 prewarming_threads: None,
128 }
129 }
130}
131
132#[cfg(feature = "rayon")]
133impl RayonConfig {
134 pub const fn with_reserved_cpu_cores(mut self, reserved_cpu_cores: usize) -> Self {
136 self.reserved_cpu_cores = reserved_cpu_cores;
137 self
138 }
139
140 pub const fn with_max_blocking_tasks(mut self, max_blocking_tasks: usize) -> Self {
142 self.max_blocking_tasks = max_blocking_tasks;
143 self
144 }
145
146 pub const fn with_rpc_threads(mut self, rpc_threads: usize) -> Self {
148 self.rpc_threads = Some(rpc_threads);
149 self
150 }
151
152 pub const fn with_storage_threads(mut self, storage_threads: usize) -> Self {
154 self.storage_threads = Some(storage_threads);
155 self
156 }
157
158 pub const fn with_proof_storage_worker_threads(
160 mut self,
161 proof_storage_worker_threads: usize,
162 ) -> Self {
163 self.proof_storage_worker_threads = Some(proof_storage_worker_threads);
164 self
165 }
166
167 pub const fn with_proof_account_worker_threads(
169 mut self,
170 proof_account_worker_threads: usize,
171 ) -> Self {
172 self.proof_account_worker_threads = Some(proof_account_worker_threads);
173 self
174 }
175
176 pub const fn with_prewarming_threads(mut self, prewarming_threads: usize) -> Self {
178 self.prewarming_threads = Some(prewarming_threads);
179 self
180 }
181
182 fn default_thread_count(&self) -> usize {
184 let _ = self.reserved_cpu_cores;
187 self.cpu_threads.unwrap_or_else(|| available_parallelism().map_or(1, NonZeroUsize::get))
188 }
189}
190
191#[derive(Debug, Clone, Default)]
193pub struct RuntimeConfig {
194 pub tokio: TokioConfig,
196 #[cfg(feature = "rayon")]
198 pub rayon: RayonConfig,
199}
200
201impl RuntimeConfig {
202 pub fn with_tokio(mut self, tokio: TokioConfig) -> Self {
204 self.tokio = tokio;
205 self
206 }
207
208 #[cfg(feature = "rayon")]
210 pub const fn with_rayon(mut self, rayon: RayonConfig) -> Self {
211 self.rayon = rayon;
212 self
213 }
214}
215
216#[derive(Debug, thiserror::Error)]
218pub enum RuntimeBuildError {
219 #[error("Failed to build tokio runtime: {0}")]
221 TokioBuild(#[from] std::io::Error),
222 #[cfg(feature = "rayon")]
224 #[error("Failed to build rayon thread pool: {0}")]
225 RayonBuild(#[from] rayon::ThreadPoolBuildError),
226}
227
228struct RuntimeInner {
231 _tokio_runtime: Option<TokioRuntime>,
233 handle: Handle,
235 on_shutdown: Shutdown,
237 task_events_tx: UnboundedSender<TaskEvent>,
239 metrics: TaskExecutorMetrics,
241 graceful_tasks: Arc<AtomicUsize>,
243 #[cfg(feature = "rayon")]
245 cpu_pool: rayon::ThreadPool,
246 #[cfg(feature = "rayon")]
248 rpc_pool: BlockingTaskPool,
249 #[cfg(feature = "rayon")]
251 storage_pool: rayon::ThreadPool,
252 #[cfg(feature = "rayon")]
254 blocking_guard: BlockingTaskGuard,
255 #[cfg(feature = "rayon")]
257 proof_storage_worker_pool: WorkerPool,
258 #[cfg(feature = "rayon")]
260 proof_account_worker_pool: WorkerPool,
261 #[cfg(feature = "rayon")]
263 prewarming_pool: WorkerPool,
264 worker_map: WorkerMap,
267 task_manager_handle: Mutex<Option<JoinHandle<Result<(), PanickedTaskError>>>>,
271}
272
273#[derive(Clone)]
282pub struct Runtime(Arc<RuntimeInner>);
283
284impl std::fmt::Debug for Runtime {
285 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286 f.debug_struct("Runtime").field("handle", &self.0.handle).finish()
287 }
288}
289
290impl Runtime {
293 pub fn take_task_manager_handle(&self) -> Option<JoinHandle<Result<(), PanickedTaskError>>> {
299 self.0.task_manager_handle.lock().unwrap().take()
300 }
301
302 pub fn handle(&self) -> &Handle {
304 &self.0.handle
305 }
306
307 #[cfg(feature = "rayon")]
309 pub fn cpu_pool(&self) -> &rayon::ThreadPool {
310 &self.0.cpu_pool
311 }
312
313 #[cfg(feature = "rayon")]
315 pub fn rpc_pool(&self) -> &BlockingTaskPool {
316 &self.0.rpc_pool
317 }
318
319 #[cfg(feature = "rayon")]
321 pub fn storage_pool(&self) -> &rayon::ThreadPool {
322 &self.0.storage_pool
323 }
324
325 #[cfg(feature = "rayon")]
327 pub fn blocking_guard(&self) -> BlockingTaskGuard {
328 self.0.blocking_guard.clone()
329 }
330
331 #[cfg(feature = "rayon")]
333 pub fn proof_storage_worker_pool(&self) -> &WorkerPool {
334 &self.0.proof_storage_worker_pool
335 }
336
337 #[cfg(feature = "rayon")]
339 pub fn proof_account_worker_pool(&self) -> &WorkerPool {
340 &self.0.proof_account_worker_pool
341 }
342
343 #[cfg(feature = "rayon")]
345 pub fn prewarming_pool(&self) -> &WorkerPool {
346 &self.0.prewarming_pool
347 }
348}
349
350impl Runtime {
353 pub fn test() -> Self {
358 let config = match Handle::try_current() {
359 Ok(handle) => Self::test_config().with_tokio(TokioConfig::existing_handle(handle)),
360 Err(_) => Self::test_config(),
361 };
362 RuntimeBuilder::new(config).build().expect("failed to build test Runtime")
363 }
364
365 const fn test_config() -> RuntimeConfig {
366 RuntimeConfig {
367 tokio: TokioConfig::Owned {
368 worker_threads: Some(2),
369 thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
370 thread_name: "tokio-test",
371 },
372 #[cfg(feature = "rayon")]
373 rayon: RayonConfig {
374 cpu_threads: Some(2),
375 reserved_cpu_cores: 0,
376 rpc_threads: Some(2),
377 storage_threads: Some(2),
378 max_blocking_tasks: 16,
379 proof_storage_worker_threads: Some(2),
380 proof_account_worker_threads: Some(2),
381 prewarming_threads: Some(2),
382 },
383 }
384 }
385}
386
387enum TaskKind {
391 Default,
393 Blocking,
395}
396
397impl Runtime {
398 pub fn on_shutdown_signal(&self) -> &Shutdown {
400 &self.0.on_shutdown
401 }
402
403 fn spawn_on_rt<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
405 where
406 F: Future<Output = ()> + Send + 'static,
407 {
408 match task_kind {
409 TaskKind::Default => self.0.handle.spawn(fut),
410 TaskKind::Blocking => {
411 let handle = self.0.handle.clone();
412 self.0.handle.spawn_blocking(move || handle.block_on(fut))
413 }
414 }
415 }
416
417 fn spawn_task_as<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
419 where
420 F: Future<Output = ()> + Send + 'static,
421 {
422 match task_kind {
423 TaskKind::Default => self.0.metrics.inc_regular_tasks(),
424 TaskKind::Blocking => self.0.metrics.inc_regular_blocking_tasks(),
425 }
426 let on_shutdown = self.0.on_shutdown.clone();
427
428 let finished_counter = match task_kind {
429 TaskKind::Default => self.0.metrics.finished_regular_tasks_total.clone(),
430 TaskKind::Blocking => self.0.metrics.finished_regular_blocking_tasks_total.clone(),
431 };
432
433 let task = {
434 async move {
435 let _inc_counter_on_drop = IncCounterOnDrop::new(finished_counter);
436 let fut = pin!(fut);
437 let _ = select(on_shutdown, fut).await;
438 }
439 }
440 .in_current_span();
441
442 self.spawn_on_rt(task, task_kind)
443 }
444
445 pub fn spawn_task<F>(&self, fut: F) -> JoinHandle<()>
450 where
451 F: Future<Output = ()> + Send + 'static,
452 {
453 self.spawn_task_as(fut, TaskKind::Default)
454 }
455
456 pub fn spawn_blocking_task<F>(&self, fut: F) -> JoinHandle<()>
461 where
462 F: Future<Output = ()> + Send + 'static,
463 {
464 self.spawn_task_as(fut, TaskKind::Blocking)
465 }
466
467 pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
470 where
471 F: FnOnce() -> R + Send + 'static,
472 R: Send + 'static,
473 {
474 self.0.handle.spawn_blocking(func)
475 }
476
477 pub fn spawn_blocking_named<F, R>(&self, name: &'static str, func: F) -> crate::LazyHandle<R>
489 where
490 F: FnOnce() -> R + Send + 'static,
491 R: Send + 'static,
492 {
493 crate::LazyHandle::new(self.0.worker_map.spawn_on(name, func))
494 }
495
496 pub fn spawn_with_signal<F>(&self, f: impl FnOnce(Shutdown) -> F) -> JoinHandle<()>
501 where
502 F: Future<Output = ()> + Send + 'static,
503 {
504 let on_shutdown = self.0.on_shutdown.clone();
505 let fut = f(on_shutdown);
506 let task = fut.in_current_span();
507 self.0.handle.spawn(task)
508 }
509
510 fn spawn_critical_as<F>(
512 &self,
513 name: &'static str,
514 fut: F,
515 task_kind: TaskKind,
516 ) -> JoinHandle<()>
517 where
518 F: Future<Output = ()> + Send + 'static,
519 {
520 self.0.metrics.inc_critical_tasks();
521 let panicked_tasks_tx = self.0.task_events_tx.clone();
522 let on_shutdown = self.0.on_shutdown.clone();
523
524 let task = std::panic::AssertUnwindSafe(fut)
526 .catch_unwind()
527 .map_err(move |error| {
528 let task_error = PanickedTaskError::new(name, error);
529 error!("{task_error}");
530 let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
531 })
532 .in_current_span();
533
534 let finished_critical_tasks_total_metrics =
535 self.0.metrics.finished_critical_tasks_total.clone();
536 let task = async move {
537 let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_total_metrics);
538 let task = pin!(task);
539 let _ = select(on_shutdown, task).await;
540 };
541
542 self.spawn_on_rt(task, task_kind)
543 }
544
545 pub fn spawn_critical_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
550 where
551 F: Future<Output = ()> + Send + 'static,
552 {
553 self.spawn_critical_as(name, fut, TaskKind::Default)
554 }
555
556 pub fn spawn_critical_blocking_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
561 where
562 F: Future<Output = ()> + Send + 'static,
563 {
564 self.spawn_critical_as(name, fut, TaskKind::Blocking)
565 }
566
567 pub fn spawn_critical_with_shutdown_signal<F>(
571 &self,
572 name: &'static str,
573 f: impl FnOnce(Shutdown) -> F,
574 ) -> JoinHandle<()>
575 where
576 F: Future<Output = ()> + Send + 'static,
577 {
578 let panicked_tasks_tx = self.0.task_events_tx.clone();
579 let on_shutdown = self.0.on_shutdown.clone();
580 let fut = f(on_shutdown);
581
582 let task = std::panic::AssertUnwindSafe(fut)
584 .catch_unwind()
585 .map_err(move |error| {
586 let task_error = PanickedTaskError::new(name, error);
587 error!("{task_error}");
588 let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
589 })
590 .map(drop)
591 .in_current_span();
592
593 self.0.handle.spawn(task)
594 }
595
596 pub fn spawn_critical_with_graceful_shutdown_signal<F>(
617 &self,
618 name: &'static str,
619 f: impl FnOnce(GracefulShutdown) -> F,
620 ) -> JoinHandle<()>
621 where
622 F: Future<Output = ()> + Send + 'static,
623 {
624 let panicked_tasks_tx = self.0.task_events_tx.clone();
625 let on_shutdown = GracefulShutdown::new(
626 self.0.on_shutdown.clone(),
627 GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
628 );
629 let fut = f(on_shutdown);
630
631 let task = std::panic::AssertUnwindSafe(fut)
633 .catch_unwind()
634 .map_err(move |error| {
635 let task_error = PanickedTaskError::new(name, error);
636 error!("{task_error}");
637 let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
638 })
639 .map(drop)
640 .in_current_span();
641
642 self.0.handle.spawn(task)
643 }
644
645 pub fn spawn_with_graceful_shutdown_signal<F>(
665 &self,
666 f: impl FnOnce(GracefulShutdown) -> F,
667 ) -> JoinHandle<()>
668 where
669 F: Future<Output = ()> + Send + 'static,
670 {
671 let on_shutdown = GracefulShutdown::new(
672 self.0.on_shutdown.clone(),
673 GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
674 );
675 let fut = f(on_shutdown);
676
677 self.0.handle.spawn(fut)
678 }
679
680 pub fn initiate_graceful_shutdown(
684 &self,
685 ) -> Result<GracefulShutdown, tokio::sync::mpsc::error::SendError<()>> {
686 self.0
687 .task_events_tx
688 .send(TaskEvent::GracefulShutdown)
689 .map_err(|_send_error_with_task_event| tokio::sync::mpsc::error::SendError(()))?;
690
691 Ok(GracefulShutdown::new(
692 self.0.on_shutdown.clone(),
693 GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
694 ))
695 }
696
697 pub fn graceful_shutdown(&self) {
699 let _ = self.do_graceful_shutdown(None);
700 }
701
702 pub fn graceful_shutdown_with_timeout(&self, timeout: Duration) -> bool {
707 self.do_graceful_shutdown(Some(timeout))
708 }
709
710 fn do_graceful_shutdown(&self, timeout: Option<Duration>) -> bool {
711 let _ = self.0.task_events_tx.send(TaskEvent::GracefulShutdown);
712 let deadline = timeout.map(|t| Instant::now() + t);
713 while self.0.graceful_tasks.load(Ordering::SeqCst) > 0 {
714 if deadline.is_some_and(|d| Instant::now() > d) {
715 debug!("graceful shutdown timed out");
716 return false;
717 }
718 std::thread::yield_now();
719 }
720 debug!("gracefully shut down");
721 true
722 }
723}
724
725#[derive(Debug, Clone)]
729pub struct RuntimeBuilder {
730 config: RuntimeConfig,
731}
732
733impl RuntimeBuilder {
734 pub const fn new(config: RuntimeConfig) -> Self {
736 Self { config }
737 }
738
739 #[tracing::instrument(name = "RuntimeBuilder::build", level = "debug", skip_all)]
745 pub fn build(self) -> Result<Runtime, RuntimeBuildError> {
746 debug!(?self.config, "Building runtime");
747 let config = self.config;
748
749 let (owned_runtime, handle) = match &config.tokio {
750 TokioConfig::Owned { worker_threads, thread_keep_alive, thread_name } => {
751 let mut builder = tokio::runtime::Builder::new_multi_thread();
752 builder
753 .enable_all()
754 .thread_keep_alive(*thread_keep_alive)
755 .thread_name(*thread_name);
756
757 if let Some(threads) = worker_threads {
758 builder.worker_threads(*threads);
759 }
760
761 let runtime = builder.build()?;
762 let h = runtime.handle().clone();
763 (Some(runtime), h)
764 }
765 TokioConfig::ExistingHandle(h) => (None, h.clone()),
766 };
767
768 let (task_manager, on_shutdown, task_events_tx, graceful_tasks) =
769 TaskManager::new_parts(handle.clone());
770
771 #[cfg(feature = "rayon")]
772 let (
773 cpu_pool,
774 rpc_pool,
775 storage_pool,
776 blocking_guard,
777 proof_storage_worker_pool,
778 proof_account_worker_pool,
779 prewarming_pool,
780 ) = {
781 let default_threads = config.rayon.default_thread_count();
782 let rpc_threads = config.rayon.rpc_threads.unwrap_or(default_threads);
783
784 let cpu_pool = rayon::ThreadPoolBuilder::new()
785 .num_threads(default_threads)
786 .thread_name(|i| format!("cpu-{i:02}"))
787 .build()?;
788
789 let rpc_raw = rayon::ThreadPoolBuilder::new()
790 .num_threads(rpc_threads)
791 .thread_name(|i| format!("rpc-{i:02}"))
792 .build()?;
793 let rpc_pool = BlockingTaskPool::new(rpc_raw);
794
795 let storage_threads =
796 config.rayon.storage_threads.unwrap_or(DEFAULT_STORAGE_POOL_THREADS);
797 let storage_pool = rayon::ThreadPoolBuilder::new()
798 .num_threads(storage_threads)
799 .thread_name(|i| format!("storage-{i:02}"))
800 .build()?;
801
802 let blocking_guard = BlockingTaskGuard::new(config.rayon.max_blocking_tasks);
803
804 let proof_storage_worker_threads =
805 config.rayon.proof_storage_worker_threads.unwrap_or(default_threads * 2);
806 let proof_storage_worker_pool = WorkerPool::from_builder(
807 rayon::ThreadPoolBuilder::new()
808 .num_threads(proof_storage_worker_threads)
809 .thread_name(|i| format!("proof-strg-{i:02}")),
810 )?;
811
812 let proof_account_worker_threads =
813 config.rayon.proof_account_worker_threads.unwrap_or(default_threads * 2);
814 let proof_account_worker_pool = WorkerPool::from_builder(
815 rayon::ThreadPoolBuilder::new()
816 .num_threads(proof_account_worker_threads)
817 .thread_name(|i| format!("proof-acct-{i:02}")),
818 )?;
819
820 let prewarming_threads = config.rayon.prewarming_threads.unwrap_or(default_threads);
821 let prewarming_pool = WorkerPool::from_builder(
822 rayon::ThreadPoolBuilder::new()
823 .num_threads(prewarming_threads)
824 .thread_name(|i| format!("prewarm-{i:02}")),
825 )?;
826
827 debug!(
828 default_threads,
829 rpc_threads,
830 storage_threads,
831 proof_storage_worker_threads,
832 proof_account_worker_threads,
833 prewarming_threads,
834 max_blocking_tasks = config.rayon.max_blocking_tasks,
835 "Initialized rayon thread pools"
836 );
837
838 (
839 cpu_pool,
840 rpc_pool,
841 storage_pool,
842 blocking_guard,
843 proof_storage_worker_pool,
844 proof_account_worker_pool,
845 prewarming_pool,
846 )
847 };
848
849 let task_manager_handle = handle.spawn(async move {
850 let result = task_manager.await;
851 if let Err(ref err) = result {
852 debug!("{err}");
853 }
854 result
855 });
856
857 let inner = RuntimeInner {
858 _tokio_runtime: owned_runtime,
859 handle,
860 on_shutdown,
861 task_events_tx,
862 metrics: Default::default(),
863 graceful_tasks,
864 #[cfg(feature = "rayon")]
865 cpu_pool,
866 #[cfg(feature = "rayon")]
867 rpc_pool,
868 #[cfg(feature = "rayon")]
869 storage_pool,
870 #[cfg(feature = "rayon")]
871 blocking_guard,
872 #[cfg(feature = "rayon")]
873 proof_storage_worker_pool,
874 #[cfg(feature = "rayon")]
875 proof_account_worker_pool,
876 #[cfg(feature = "rayon")]
877 prewarming_pool,
878 worker_map: WorkerMap::new(),
879 task_manager_handle: Mutex::new(Some(task_manager_handle)),
880 };
881
882 Ok(Runtime(Arc::new(inner)))
883 }
884}
885
886#[cfg(test)]
887mod tests {
888 use super::*;
889
890 #[test]
891 fn test_runtime_config_default() {
892 let config = RuntimeConfig::default();
893 assert!(matches!(config.tokio, TokioConfig::Owned { .. }));
894 }
895
896 #[test]
897 fn test_runtime_config_existing_handle() {
898 let rt = TokioRuntime::new().unwrap();
899 let config =
900 Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone()));
901 assert!(matches!(config.tokio, TokioConfig::ExistingHandle(_)));
902 }
903
904 #[cfg(feature = "rayon")]
905 #[test]
906 fn test_rayon_config_thread_count() {
907 let config = RayonConfig::default();
908 let count = config.default_thread_count();
909 assert!(count >= 1);
910 }
911
912 #[test]
913 fn test_runtime_builder() {
914 let rt = TokioRuntime::new().unwrap();
915 let config =
916 Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone()));
917 let runtime = RuntimeBuilder::new(config).build().unwrap();
918 let _ = runtime.handle();
919 }
920}