1#[cfg(feature = "rayon")]
10use crate::pool::{BlockingTaskGuard, BlockingTaskPool};
11use crate::{
12 metrics::{IncCounterOnDrop, TaskExecutorMetrics},
13 shutdown::{GracefulShutdown, GracefulShutdownGuard, Shutdown},
14 PanickedTaskError, TaskEvent, TaskManager,
15};
16use futures_util::{
17 future::{select, BoxFuture},
18 Future, FutureExt, TryFutureExt,
19};
20#[cfg(feature = "rayon")]
21use std::thread::available_parallelism;
22use std::{
23 pin::pin,
24 sync::{
25 atomic::{AtomicUsize, Ordering},
26 Arc, Mutex,
27 },
28 time::Duration,
29};
30use tokio::{runtime::Handle, sync::mpsc::UnboundedSender, task::JoinHandle};
31use tracing::{debug, error};
32use tracing_futures::Instrument;
33
34use tokio::runtime::Runtime as TokioRuntime;
35
36pub const DEFAULT_THREAD_KEEP_ALIVE: Duration = Duration::from_secs(15);
38
39pub const DEFAULT_RESERVED_CPU_CORES: usize = 2;
41
42pub const DEFAULT_STORAGE_POOL_THREADS: usize = 16;
44
45pub const DEFAULT_MAX_BLOCKING_TASKS: usize = 512;
47
48#[derive(Debug, Clone)]
50pub enum TokioConfig {
51 Owned {
53 worker_threads: Option<usize>,
55 thread_keep_alive: Duration,
57 thread_name: &'static str,
59 },
60 ExistingHandle(Handle),
62}
63
64impl Default for TokioConfig {
65 fn default() -> Self {
66 Self::Owned {
67 worker_threads: None,
68 thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
69 thread_name: "tokio-rt",
70 }
71 }
72}
73
74impl TokioConfig {
75 pub const fn existing_handle(handle: Handle) -> Self {
77 Self::ExistingHandle(handle)
78 }
79
80 pub const fn with_worker_threads(worker_threads: usize) -> Self {
82 Self::Owned {
83 worker_threads: Some(worker_threads),
84 thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
85 thread_name: "tokio-rt",
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
92#[cfg(feature = "rayon")]
93pub struct RayonConfig {
94 pub cpu_threads: Option<usize>,
97 pub reserved_cpu_cores: usize,
99 pub rpc_threads: Option<usize>,
102 pub storage_threads: Option<usize>,
105 pub max_blocking_tasks: usize,
107 pub proof_storage_worker_threads: Option<usize>,
110 pub proof_account_worker_threads: Option<usize>,
113}
114
115#[cfg(feature = "rayon")]
116impl Default for RayonConfig {
117 fn default() -> Self {
118 Self {
119 cpu_threads: None,
120 reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
121 rpc_threads: None,
122 storage_threads: None,
123 max_blocking_tasks: DEFAULT_MAX_BLOCKING_TASKS,
124 proof_storage_worker_threads: None,
125 proof_account_worker_threads: None,
126 }
127 }
128}
129
130#[cfg(feature = "rayon")]
131impl RayonConfig {
132 pub const fn with_reserved_cpu_cores(mut self, reserved_cpu_cores: usize) -> Self {
134 self.reserved_cpu_cores = reserved_cpu_cores;
135 self
136 }
137
138 pub const fn with_max_blocking_tasks(mut self, max_blocking_tasks: usize) -> Self {
140 self.max_blocking_tasks = max_blocking_tasks;
141 self
142 }
143
144 pub const fn with_rpc_threads(mut self, rpc_threads: usize) -> Self {
146 self.rpc_threads = Some(rpc_threads);
147 self
148 }
149
150 pub const fn with_storage_threads(mut self, storage_threads: usize) -> Self {
152 self.storage_threads = Some(storage_threads);
153 self
154 }
155
156 pub const fn with_proof_storage_worker_threads(
158 mut self,
159 proof_storage_worker_threads: usize,
160 ) -> Self {
161 self.proof_storage_worker_threads = Some(proof_storage_worker_threads);
162 self
163 }
164
165 pub const fn with_proof_account_worker_threads(
167 mut self,
168 proof_account_worker_threads: usize,
169 ) -> Self {
170 self.proof_account_worker_threads = Some(proof_account_worker_threads);
171 self
172 }
173
174 fn default_thread_count(&self) -> usize {
176 self.cpu_threads.unwrap_or_else(|| {
177 available_parallelism()
178 .map_or(1, |num| num.get().saturating_sub(self.reserved_cpu_cores).max(1))
179 })
180 }
181}
182
183#[derive(Debug, Clone, Default)]
185pub struct RuntimeConfig {
186 pub tokio: TokioConfig,
188 #[cfg(feature = "rayon")]
190 pub rayon: RayonConfig,
191}
192
193impl RuntimeConfig {
194 #[cfg_attr(not(feature = "rayon"), allow(clippy::missing_const_for_fn))]
196 pub fn with_existing_handle(handle: Handle) -> Self {
197 Self {
198 tokio: TokioConfig::ExistingHandle(handle),
199 #[cfg(feature = "rayon")]
200 rayon: RayonConfig::default(),
201 }
202 }
203
204 pub fn with_tokio(mut self, tokio: TokioConfig) -> Self {
206 self.tokio = tokio;
207 self
208 }
209
210 #[cfg(feature = "rayon")]
212 pub const fn with_rayon(mut self, rayon: RayonConfig) -> Self {
213 self.rayon = rayon;
214 self
215 }
216}
217
218#[derive(Debug, thiserror::Error)]
220pub enum RuntimeBuildError {
221 #[error("Failed to build tokio runtime: {0}")]
223 TokioBuild(#[from] std::io::Error),
224 #[cfg(feature = "rayon")]
226 #[error("Failed to build rayon thread pool: {0}")]
227 RayonBuild(#[from] rayon::ThreadPoolBuildError),
228}
229
230struct RuntimeInner {
233 _tokio_runtime: Option<TokioRuntime>,
235 handle: Handle,
237 on_shutdown: Shutdown,
239 task_events_tx: UnboundedSender<TaskEvent>,
241 metrics: TaskExecutorMetrics,
243 graceful_tasks: Arc<AtomicUsize>,
245 #[cfg(feature = "rayon")]
247 cpu_pool: rayon::ThreadPool,
248 #[cfg(feature = "rayon")]
250 rpc_pool: BlockingTaskPool,
251 #[cfg(feature = "rayon")]
253 storage_pool: rayon::ThreadPool,
254 #[cfg(feature = "rayon")]
256 blocking_guard: BlockingTaskGuard,
257 #[cfg(feature = "rayon")]
259 proof_storage_worker_pool: rayon::ThreadPool,
260 #[cfg(feature = "rayon")]
262 proof_account_worker_pool: rayon::ThreadPool,
263 task_manager_handle: Mutex<Option<JoinHandle<Result<(), PanickedTaskError>>>>,
267}
268
269#[derive(Clone)]
278pub struct Runtime(Arc<RuntimeInner>);
279
280impl std::fmt::Debug for Runtime {
281 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282 f.debug_struct("Runtime").field("handle", &self.0.handle).finish()
283 }
284}
285
286#[cfg(any(test, feature = "test-utils"))]
287impl Default for Runtime {
288 fn default() -> Self {
289 let config = match Handle::try_current() {
290 Ok(handle) => RuntimeConfig::with_existing_handle(handle),
291 Err(_) => RuntimeConfig::default(),
292 };
293 RuntimeBuilder::new(config).build().expect("failed to build default Runtime")
294 }
295}
296
297impl Runtime {
300 pub fn with_existing_handle(handle: Handle) -> Result<Self, RuntimeBuildError> {
302 RuntimeBuilder::new(RuntimeConfig::with_existing_handle(handle)).build()
303 }
304}
305
306impl Runtime {
309 pub fn take_task_manager_handle(&self) -> Option<JoinHandle<Result<(), PanickedTaskError>>> {
315 self.0.task_manager_handle.lock().unwrap().take()
316 }
317
318 pub fn handle(&self) -> &Handle {
320 &self.0.handle
321 }
322
323 #[cfg(feature = "rayon")]
325 pub fn cpu_pool(&self) -> &rayon::ThreadPool {
326 &self.0.cpu_pool
327 }
328
329 #[cfg(feature = "rayon")]
331 pub fn rpc_pool(&self) -> &BlockingTaskPool {
332 &self.0.rpc_pool
333 }
334
335 #[cfg(feature = "rayon")]
337 pub fn storage_pool(&self) -> &rayon::ThreadPool {
338 &self.0.storage_pool
339 }
340
341 #[cfg(feature = "rayon")]
343 pub fn blocking_guard(&self) -> BlockingTaskGuard {
344 self.0.blocking_guard.clone()
345 }
346
347 #[cfg(feature = "rayon")]
349 pub fn proof_storage_worker_pool(&self) -> &rayon::ThreadPool {
350 &self.0.proof_storage_worker_pool
351 }
352
353 #[cfg(feature = "rayon")]
355 pub fn proof_account_worker_pool(&self) -> &rayon::ThreadPool {
356 &self.0.proof_account_worker_pool
357 }
358}
359
360impl Runtime {
363 pub fn test() -> Self {
368 let config = match Handle::try_current() {
369 Ok(handle) => Self::test_config().with_tokio(TokioConfig::existing_handle(handle)),
370 Err(_) => Self::test_config(),
371 };
372 RuntimeBuilder::new(config).build().expect("failed to build test Runtime")
373 }
374
375 pub fn test_with_handle(handle: Handle) -> Self {
377 let config = Self::test_config().with_tokio(TokioConfig::existing_handle(handle));
378 RuntimeBuilder::new(config).build().expect("failed to build test Runtime")
379 }
380
381 const fn test_config() -> RuntimeConfig {
382 RuntimeConfig {
383 tokio: TokioConfig::Owned {
384 worker_threads: Some(2),
385 thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
386 thread_name: "tokio-test",
387 },
388 #[cfg(feature = "rayon")]
389 rayon: RayonConfig {
390 cpu_threads: Some(2),
391 reserved_cpu_cores: 0,
392 rpc_threads: Some(2),
393 storage_threads: Some(2),
394 max_blocking_tasks: 16,
395 proof_storage_worker_threads: Some(2),
396 proof_account_worker_threads: Some(2),
397 },
398 }
399 }
400}
401
402enum TaskKind {
406 Default,
408 Blocking,
410}
411
412impl Runtime {
413 pub fn on_shutdown_signal(&self) -> &Shutdown {
415 &self.0.on_shutdown
416 }
417
418 fn spawn_on_rt<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
420 where
421 F: Future<Output = ()> + Send + 'static,
422 {
423 match task_kind {
424 TaskKind::Default => self.0.handle.spawn(fut),
425 TaskKind::Blocking => {
426 let handle = self.0.handle.clone();
427 self.0.handle.spawn_blocking(move || handle.block_on(fut))
428 }
429 }
430 }
431
432 fn spawn_task_as<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
434 where
435 F: Future<Output = ()> + Send + 'static,
436 {
437 let on_shutdown = self.0.on_shutdown.clone();
438
439 let finished_counter = match task_kind {
440 TaskKind::Default => self.0.metrics.finished_regular_tasks_total.clone(),
441 TaskKind::Blocking => self.0.metrics.finished_regular_blocking_tasks_total.clone(),
442 };
443
444 let task = {
445 async move {
446 let _inc_counter_on_drop = IncCounterOnDrop::new(finished_counter);
447 let fut = pin!(fut);
448 let _ = select(on_shutdown, fut).await;
449 }
450 }
451 .in_current_span();
452
453 self.spawn_on_rt(task, task_kind)
454 }
455
456 pub fn spawn_task<F>(&self, fut: F) -> JoinHandle<()>
461 where
462 F: Future<Output = ()> + Send + 'static,
463 {
464 self.spawn_task_as(fut, TaskKind::Default)
465 }
466
467 pub fn spawn_blocking_task<F>(&self, fut: F) -> JoinHandle<()>
472 where
473 F: Future<Output = ()> + Send + 'static,
474 {
475 self.spawn_task_as(fut, TaskKind::Blocking)
476 }
477
478 pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
481 where
482 F: FnOnce() -> R + Send + 'static,
483 R: Send + 'static,
484 {
485 self.0.handle.spawn_blocking(func)
486 }
487
488 pub fn spawn_with_signal<F>(&self, f: impl FnOnce(Shutdown) -> F) -> JoinHandle<()>
493 where
494 F: Future<Output = ()> + Send + 'static,
495 {
496 let on_shutdown = self.0.on_shutdown.clone();
497 let fut = f(on_shutdown);
498 let task = fut.in_current_span();
499 self.0.handle.spawn(task)
500 }
501
502 fn spawn_critical_as<F>(
504 &self,
505 name: &'static str,
506 fut: F,
507 task_kind: TaskKind,
508 ) -> JoinHandle<()>
509 where
510 F: Future<Output = ()> + Send + 'static,
511 {
512 let panicked_tasks_tx = self.0.task_events_tx.clone();
513 let on_shutdown = self.0.on_shutdown.clone();
514
515 let task = std::panic::AssertUnwindSafe(fut)
517 .catch_unwind()
518 .map_err(move |error| {
519 let task_error = PanickedTaskError::new(name, error);
520 error!("{task_error}");
521 let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
522 })
523 .in_current_span();
524
525 let finished_critical_tasks_total_metrics =
526 self.0.metrics.finished_critical_tasks_total.clone();
527 let task = async move {
528 let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_total_metrics);
529 let task = pin!(task);
530 let _ = select(on_shutdown, task).await;
531 };
532
533 self.spawn_on_rt(task, task_kind)
534 }
535
536 pub fn spawn_critical_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
541 where
542 F: Future<Output = ()> + Send + 'static,
543 {
544 self.spawn_critical_as(name, fut, TaskKind::Default)
545 }
546
547 pub fn spawn_critical_blocking_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
552 where
553 F: Future<Output = ()> + Send + 'static,
554 {
555 self.spawn_critical_as(name, fut, TaskKind::Blocking)
556 }
557
558 pub fn spawn_critical_with_shutdown_signal<F>(
562 &self,
563 name: &'static str,
564 f: impl FnOnce(Shutdown) -> F,
565 ) -> JoinHandle<()>
566 where
567 F: Future<Output = ()> + Send + 'static,
568 {
569 let panicked_tasks_tx = self.0.task_events_tx.clone();
570 let on_shutdown = self.0.on_shutdown.clone();
571 let fut = f(on_shutdown);
572
573 let task = std::panic::AssertUnwindSafe(fut)
575 .catch_unwind()
576 .map_err(move |error| {
577 let task_error = PanickedTaskError::new(name, error);
578 error!("{task_error}");
579 let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
580 })
581 .map(drop)
582 .in_current_span();
583
584 self.0.handle.spawn(task)
585 }
586
587 pub fn spawn_critical_with_graceful_shutdown_signal<F>(
608 &self,
609 name: &'static str,
610 f: impl FnOnce(GracefulShutdown) -> F,
611 ) -> JoinHandle<()>
612 where
613 F: Future<Output = ()> + Send + 'static,
614 {
615 let panicked_tasks_tx = self.0.task_events_tx.clone();
616 let on_shutdown = GracefulShutdown::new(
617 self.0.on_shutdown.clone(),
618 GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
619 );
620 let fut = f(on_shutdown);
621
622 let task = std::panic::AssertUnwindSafe(fut)
624 .catch_unwind()
625 .map_err(move |error| {
626 let task_error = PanickedTaskError::new(name, error);
627 error!("{task_error}");
628 let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
629 })
630 .map(drop)
631 .in_current_span();
632
633 self.0.handle.spawn(task)
634 }
635
636 pub fn spawn_with_graceful_shutdown_signal<F>(
656 &self,
657 f: impl FnOnce(GracefulShutdown) -> F,
658 ) -> JoinHandle<()>
659 where
660 F: Future<Output = ()> + Send + 'static,
661 {
662 let on_shutdown = GracefulShutdown::new(
663 self.0.on_shutdown.clone(),
664 GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
665 );
666 let fut = f(on_shutdown);
667
668 self.0.handle.spawn(fut)
669 }
670
671 pub fn initiate_graceful_shutdown(
675 &self,
676 ) -> Result<GracefulShutdown, tokio::sync::mpsc::error::SendError<()>> {
677 self.0
678 .task_events_tx
679 .send(TaskEvent::GracefulShutdown)
680 .map_err(|_send_error_with_task_event| tokio::sync::mpsc::error::SendError(()))?;
681
682 Ok(GracefulShutdown::new(
683 self.0.on_shutdown.clone(),
684 GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
685 ))
686 }
687
688 pub fn graceful_shutdown(&self) {
690 let _ = self.do_graceful_shutdown(None);
691 }
692
693 pub fn graceful_shutdown_with_timeout(&self, timeout: Duration) -> bool {
698 self.do_graceful_shutdown(Some(timeout))
699 }
700
701 fn do_graceful_shutdown(&self, timeout: Option<Duration>) -> bool {
702 let _ = self.0.task_events_tx.send(TaskEvent::GracefulShutdown);
703 let deadline = timeout.map(|t| std::time::Instant::now() + t);
704 while self.0.graceful_tasks.load(Ordering::SeqCst) > 0 {
705 if deadline.is_some_and(|d| std::time::Instant::now() > d) {
706 debug!("graceful shutdown timed out");
707 return false;
708 }
709 std::thread::yield_now();
710 }
711 debug!("gracefully shut down");
712 true
713 }
714}
715
716impl crate::TaskSpawner for Runtime {
719 fn spawn_task(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> {
720 self.0.metrics.inc_regular_tasks();
721 Self::spawn_task(self, fut)
722 }
723
724 fn spawn_critical_task(
725 &self,
726 name: &'static str,
727 fut: BoxFuture<'static, ()>,
728 ) -> JoinHandle<()> {
729 self.0.metrics.inc_critical_tasks();
730 Self::spawn_critical_task(self, name, fut)
731 }
732
733 fn spawn_blocking_task(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> {
734 self.0.metrics.inc_regular_blocking_tasks();
735 Self::spawn_blocking_task(self, fut)
736 }
737
738 fn spawn_critical_blocking_task(
739 &self,
740 name: &'static str,
741 fut: BoxFuture<'static, ()>,
742 ) -> JoinHandle<()> {
743 self.0.metrics.inc_critical_tasks();
744 Self::spawn_critical_blocking_task(self, name, fut)
745 }
746}
747
748impl crate::TaskSpawnerExt for Runtime {
751 fn spawn_critical_with_graceful_shutdown_signal<F>(
752 &self,
753 name: &'static str,
754 f: impl FnOnce(GracefulShutdown) -> F,
755 ) -> JoinHandle<()>
756 where
757 F: Future<Output = ()> + Send + 'static,
758 {
759 Self::spawn_critical_with_graceful_shutdown_signal(self, name, f)
760 }
761
762 fn spawn_with_graceful_shutdown_signal<F>(
763 &self,
764 f: impl FnOnce(GracefulShutdown) -> F,
765 ) -> JoinHandle<()>
766 where
767 F: Future<Output = ()> + Send + 'static,
768 {
769 Self::spawn_with_graceful_shutdown_signal(self, f)
770 }
771}
772
773#[derive(Debug, Clone)]
777pub struct RuntimeBuilder {
778 config: RuntimeConfig,
779}
780
781impl RuntimeBuilder {
782 pub const fn new(config: RuntimeConfig) -> Self {
784 Self { config }
785 }
786
787 #[tracing::instrument(level = "debug", skip_all)]
793 pub fn build(self) -> Result<Runtime, RuntimeBuildError> {
794 debug!(?self.config, "Building runtime");
795 let config = self.config;
796
797 let (owned_runtime, handle) = match &config.tokio {
798 TokioConfig::Owned { worker_threads, thread_keep_alive, thread_name } => {
799 let mut builder = tokio::runtime::Builder::new_multi_thread();
800 builder
801 .enable_all()
802 .thread_keep_alive(*thread_keep_alive)
803 .thread_name(*thread_name);
804
805 if let Some(threads) = worker_threads {
806 builder.worker_threads(*threads);
807 }
808
809 let runtime = builder.build()?;
810 let h = runtime.handle().clone();
811 (Some(runtime), h)
812 }
813 TokioConfig::ExistingHandle(h) => (None, h.clone()),
814 };
815
816 let (task_manager, on_shutdown, task_events_tx, graceful_tasks) =
817 TaskManager::new_parts(handle.clone());
818
819 #[cfg(feature = "rayon")]
820 let (
821 cpu_pool,
822 rpc_pool,
823 storage_pool,
824 blocking_guard,
825 proof_storage_worker_pool,
826 proof_account_worker_pool,
827 ) = {
828 let default_threads = config.rayon.default_thread_count();
829 let rpc_threads = config.rayon.rpc_threads.unwrap_or(default_threads);
830
831 let cpu_pool = rayon::ThreadPoolBuilder::new()
832 .num_threads(default_threads)
833 .thread_name(|i| format!("cpu-{i:02}"))
834 .build()?;
835
836 let rpc_raw = rayon::ThreadPoolBuilder::new()
837 .num_threads(rpc_threads)
838 .thread_name(|i| format!("rpc-{i:02}"))
839 .build()?;
840 let rpc_pool = BlockingTaskPool::new(rpc_raw);
841
842 let storage_threads =
843 config.rayon.storage_threads.unwrap_or(DEFAULT_STORAGE_POOL_THREADS);
844 let storage_pool = rayon::ThreadPoolBuilder::new()
845 .num_threads(storage_threads)
846 .thread_name(|i| format!("storage-{i:02}"))
847 .build()?;
848
849 let blocking_guard = BlockingTaskGuard::new(config.rayon.max_blocking_tasks);
850
851 let proof_storage_worker_threads =
852 config.rayon.proof_storage_worker_threads.unwrap_or(default_threads);
853 let proof_storage_worker_pool = rayon::ThreadPoolBuilder::new()
854 .num_threads(proof_storage_worker_threads)
855 .thread_name(|i| format!("proof-strg-{i:02}"))
856 .build()?;
857
858 let proof_account_worker_threads =
859 config.rayon.proof_account_worker_threads.unwrap_or(default_threads);
860 let proof_account_worker_pool = rayon::ThreadPoolBuilder::new()
861 .num_threads(proof_account_worker_threads)
862 .thread_name(|i| format!("proof-acct-{i:02}"))
863 .build()?;
864
865 debug!(
866 default_threads,
867 rpc_threads,
868 storage_threads,
869 proof_storage_worker_threads,
870 proof_account_worker_threads,
871 max_blocking_tasks = config.rayon.max_blocking_tasks,
872 "Initialized rayon thread pools"
873 );
874
875 (
876 cpu_pool,
877 rpc_pool,
878 storage_pool,
879 blocking_guard,
880 proof_storage_worker_pool,
881 proof_account_worker_pool,
882 )
883 };
884
885 let task_manager_handle = handle.spawn(async move {
886 let result = task_manager.await;
887 if let Err(ref err) = result {
888 debug!("{err}");
889 }
890 result
891 });
892
893 let inner = RuntimeInner {
894 _tokio_runtime: owned_runtime,
895 handle,
896 on_shutdown,
897 task_events_tx,
898 metrics: Default::default(),
899 graceful_tasks,
900 #[cfg(feature = "rayon")]
901 cpu_pool,
902 #[cfg(feature = "rayon")]
903 rpc_pool,
904 #[cfg(feature = "rayon")]
905 storage_pool,
906 #[cfg(feature = "rayon")]
907 blocking_guard,
908 #[cfg(feature = "rayon")]
909 proof_storage_worker_pool,
910 #[cfg(feature = "rayon")]
911 proof_account_worker_pool,
912 task_manager_handle: Mutex::new(Some(task_manager_handle)),
913 };
914
915 Ok(Runtime(Arc::new(inner)))
916 }
917}
918
919#[cfg(test)]
920mod tests {
921 use super::*;
922
923 #[test]
924 fn test_runtime_config_default() {
925 let config = RuntimeConfig::default();
926 assert!(matches!(config.tokio, TokioConfig::Owned { .. }));
927 }
928
929 #[test]
930 fn test_runtime_config_existing_handle() {
931 let rt = TokioRuntime::new().unwrap();
932 let config = RuntimeConfig::with_existing_handle(rt.handle().clone());
933 assert!(matches!(config.tokio, TokioConfig::ExistingHandle(_)));
934 }
935
936 #[cfg(feature = "rayon")]
937 #[test]
938 fn test_rayon_config_thread_count() {
939 let config = RayonConfig::default();
940 let count = config.default_thread_count();
941 assert!(count >= 1);
942 }
943
944 #[test]
945 fn test_runtime_builder() {
946 let rt = TokioRuntime::new().unwrap();
947 let config = RuntimeConfig::with_existing_handle(rt.handle().clone());
948 let runtime = RuntimeBuilder::new(config).build().unwrap();
949 let _ = runtime.handle();
950 }
951}