1#[cfg(feature = "rayon")]
10use crate::pool::{build_pool_with_panic_handler, BlockingTaskGuard, BlockingTaskPool, WorkerPool};
11use crate::{
12 metrics::{IncCounterOnDrop, TaskExecutorMetrics},
13 shutdown::{GracefulShutdown, GracefulShutdownGuard, Shutdown},
14 worker_map::WorkerMap,
15 PanickedTaskError, TaskEvent, TaskManager,
16};
17use futures_util::{future::select, Future, FutureExt, TryFutureExt};
18#[cfg(feature = "rayon")]
19use std::sync::OnceLock;
20#[cfg(feature = "rayon")]
21use std::{num::NonZeroUsize, thread::available_parallelism};
22use std::{
23 pin::pin,
24 sync::{
25 atomic::{AtomicUsize, Ordering},
26 Arc, Mutex,
27 },
28 time::{Duration, Instant},
29};
30use tokio::{runtime::Handle, sync::mpsc::UnboundedSender, task::JoinHandle};
31use tracing::{debug, error};
32use tracing_futures::Instrument;
33
34use tokio::runtime::Runtime as TokioRuntime;
35
36pub const DEFAULT_THREAD_KEEP_ALIVE: Duration = Duration::from_secs(15);
38
39pub const DEFAULT_RESERVED_CPU_CORES: usize = 2;
41
42pub const DEFAULT_STORAGE_POOL_THREADS: usize = 16;
44
45pub const DEFAULT_MAX_BLOCKING_TASKS: usize = 512;
47
48#[derive(Debug, Clone)]
50pub enum TokioConfig {
51 Owned {
53 worker_threads: Option<usize>,
55 thread_keep_alive: Duration,
57 thread_name: &'static str,
59 },
60 ExistingHandle(Handle),
62}
63
64impl Default for TokioConfig {
65 fn default() -> Self {
66 Self::Owned {
67 worker_threads: None,
68 thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
69 thread_name: "tokio-rt",
70 }
71 }
72}
73
74impl TokioConfig {
75 pub const fn existing_handle(handle: Handle) -> Self {
77 Self::ExistingHandle(handle)
78 }
79
80 pub const fn with_worker_threads(worker_threads: usize) -> Self {
82 Self::Owned {
83 worker_threads: Some(worker_threads),
84 thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
85 thread_name: "tokio-rt",
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
92#[cfg(feature = "rayon")]
93pub struct RayonConfig {
94 pub cpu_threads: Option<usize>,
97 pub reserved_cpu_cores: usize,
99 pub rpc_threads: Option<usize>,
102 pub storage_threads: Option<usize>,
105 pub max_blocking_tasks: usize,
107 pub proof_storage_worker_threads: Option<usize>,
110 pub proof_account_worker_threads: Option<usize>,
113 pub prewarming_threads: Option<usize>,
116 pub bal_streaming_threads: Option<usize>,
119}
120
121#[cfg(feature = "rayon")]
122impl Default for RayonConfig {
123 fn default() -> Self {
124 Self {
125 cpu_threads: None,
126 reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
127 rpc_threads: None,
128 storage_threads: None,
129 max_blocking_tasks: DEFAULT_MAX_BLOCKING_TASKS,
130 proof_storage_worker_threads: None,
131 proof_account_worker_threads: None,
132 prewarming_threads: None,
133 bal_streaming_threads: None,
134 }
135 }
136}
137
138#[cfg(feature = "rayon")]
139impl RayonConfig {
140 pub const fn with_reserved_cpu_cores(mut self, reserved_cpu_cores: usize) -> Self {
142 self.reserved_cpu_cores = reserved_cpu_cores;
143 self
144 }
145
146 pub const fn with_max_blocking_tasks(mut self, max_blocking_tasks: usize) -> Self {
148 self.max_blocking_tasks = max_blocking_tasks;
149 self
150 }
151
152 pub const fn with_rpc_threads(mut self, rpc_threads: usize) -> Self {
154 self.rpc_threads = Some(rpc_threads);
155 self
156 }
157
158 pub const fn with_storage_threads(mut self, storage_threads: usize) -> Self {
160 self.storage_threads = Some(storage_threads);
161 self
162 }
163
164 pub const fn with_proof_storage_worker_threads(
166 mut self,
167 proof_storage_worker_threads: usize,
168 ) -> Self {
169 self.proof_storage_worker_threads = Some(proof_storage_worker_threads);
170 self
171 }
172
173 pub const fn with_proof_account_worker_threads(
175 mut self,
176 proof_account_worker_threads: usize,
177 ) -> Self {
178 self.proof_account_worker_threads = Some(proof_account_worker_threads);
179 self
180 }
181
182 pub const fn with_prewarming_threads(mut self, prewarming_threads: usize) -> Self {
184 self.prewarming_threads = Some(prewarming_threads);
185 self
186 }
187
188 pub const fn with_bal_streaming_threads(mut self, bal_streaming_threads: usize) -> Self {
190 self.bal_streaming_threads = Some(bal_streaming_threads);
191 self
192 }
193
194 fn default_thread_count(&self) -> usize {
196 let _ = self.reserved_cpu_cores;
199 self.cpu_threads.unwrap_or_else(|| available_parallelism().map_or(1, NonZeroUsize::get))
200 }
201}
202
203#[derive(Debug, Clone, Default)]
205pub struct RuntimeConfig {
206 pub tokio: TokioConfig,
208 #[cfg(feature = "rayon")]
210 pub rayon: RayonConfig,
211}
212
213impl RuntimeConfig {
214 pub fn with_tokio(mut self, tokio: TokioConfig) -> Self {
216 self.tokio = tokio;
217 self
218 }
219
220 #[cfg(feature = "rayon")]
222 pub const fn with_rayon(mut self, rayon: RayonConfig) -> Self {
223 self.rayon = rayon;
224 self
225 }
226}
227
228#[derive(Debug, thiserror::Error)]
230pub enum RuntimeBuildError {
231 #[error("Failed to build tokio runtime: {0}")]
233 TokioBuild(#[from] std::io::Error),
234 #[cfg(feature = "rayon")]
236 #[error("Failed to build rayon thread pool: {0}")]
237 RayonBuild(#[from] rayon::ThreadPoolBuildError),
238}
239
240#[cfg(feature = "rayon")]
241#[derive(Debug)]
242struct LazyWorkerPool {
243 pool: OnceLock<WorkerPool>,
244 num_threads: usize,
245 thread_name_prefix: &'static str,
246}
247
248#[cfg(feature = "rayon")]
249impl LazyWorkerPool {
250 const fn new(num_threads: usize, thread_name_prefix: &'static str) -> Self {
251 Self { pool: OnceLock::new(), num_threads, thread_name_prefix }
252 }
253
254 fn get(&self) -> &WorkerPool {
255 let num_threads = self.num_threads;
256 let thread_name_prefix = self.thread_name_prefix;
257 self.pool.get_or_init(|| {
258 WorkerPool::from_builder(
259 rayon::ThreadPoolBuilder::new()
260 .num_threads(num_threads)
261 .thread_name(move |i| format!("{thread_name_prefix}-{i:02}")),
262 )
263 .unwrap_or_else(|err| panic!("failed to build {thread_name_prefix} worker pool: {err}"))
264 })
265 }
266}
267
268struct RuntimeInner {
271 _tokio_runtime: Option<TokioRuntime>,
273 handle: Handle,
275 on_shutdown: Shutdown,
277 task_events_tx: UnboundedSender<TaskEvent>,
279 metrics: TaskExecutorMetrics,
281 graceful_tasks: Arc<AtomicUsize>,
283 #[cfg(feature = "rayon")]
285 cpu_pool: rayon::ThreadPool,
286 #[cfg(feature = "rayon")]
288 rpc_pool: BlockingTaskPool,
289 #[cfg(feature = "rayon")]
291 storage_pool: rayon::ThreadPool,
292 #[cfg(feature = "rayon")]
294 blocking_guard: BlockingTaskGuard,
295 #[cfg(feature = "rayon")]
297 proof_storage_worker_pool: WorkerPool,
298 #[cfg(feature = "rayon")]
300 proof_account_worker_pool: WorkerPool,
301 #[cfg(feature = "rayon")]
303 prewarming_pool: WorkerPool,
304 #[cfg(feature = "rayon")]
306 bal_streaming_pool: LazyWorkerPool,
307 worker_map: WorkerMap,
310 task_manager_handle: Mutex<Option<JoinHandle<Result<(), PanickedTaskError>>>>,
314}
315
316#[derive(Clone)]
325pub struct Runtime(Arc<RuntimeInner>);
326
327impl std::fmt::Debug for Runtime {
328 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
329 f.debug_struct("Runtime").field("handle", &self.0.handle).finish()
330 }
331}
332
333impl Runtime {
336 pub fn take_task_manager_handle(&self) -> Option<JoinHandle<Result<(), PanickedTaskError>>> {
342 self.0.task_manager_handle.lock().unwrap().take()
343 }
344
345 pub fn handle(&self) -> &Handle {
347 &self.0.handle
348 }
349
350 #[cfg(feature = "rayon")]
352 pub fn cpu_pool(&self) -> &rayon::ThreadPool {
353 &self.0.cpu_pool
354 }
355
356 #[cfg(feature = "rayon")]
358 pub fn rpc_pool(&self) -> &BlockingTaskPool {
359 &self.0.rpc_pool
360 }
361
362 #[cfg(feature = "rayon")]
364 pub fn storage_pool(&self) -> &rayon::ThreadPool {
365 &self.0.storage_pool
366 }
367
368 #[cfg(feature = "rayon")]
370 pub fn blocking_guard(&self) -> BlockingTaskGuard {
371 self.0.blocking_guard.clone()
372 }
373
374 #[cfg(feature = "rayon")]
376 pub fn proof_storage_worker_pool(&self) -> &WorkerPool {
377 &self.0.proof_storage_worker_pool
378 }
379
380 #[cfg(feature = "rayon")]
382 pub fn proof_account_worker_pool(&self) -> &WorkerPool {
383 &self.0.proof_account_worker_pool
384 }
385
386 #[cfg(feature = "rayon")]
388 pub fn prewarming_pool(&self) -> &WorkerPool {
389 &self.0.prewarming_pool
390 }
391
392 #[cfg(feature = "rayon")]
394 pub fn bal_streaming_pool(&self) -> &WorkerPool {
395 self.0.bal_streaming_pool.get()
396 }
397}
398
399impl Runtime {
402 pub fn test() -> Self {
407 let config = match Handle::try_current() {
408 Ok(handle) => Self::test_config().with_tokio(TokioConfig::existing_handle(handle)),
409 Err(_) => Self::test_config(),
410 };
411 RuntimeBuilder::new(config).build().expect("failed to build test Runtime")
412 }
413
414 const fn test_config() -> RuntimeConfig {
415 RuntimeConfig {
416 tokio: TokioConfig::Owned {
417 worker_threads: Some(2),
418 thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
419 thread_name: "tokio-test",
420 },
421 #[cfg(feature = "rayon")]
422 rayon: RayonConfig {
423 cpu_threads: Some(2),
424 reserved_cpu_cores: 0,
425 rpc_threads: Some(2),
426 storage_threads: Some(2),
427 max_blocking_tasks: 16,
428 proof_storage_worker_threads: Some(2),
429 proof_account_worker_threads: Some(2),
430 prewarming_threads: Some(2),
431 bal_streaming_threads: Some(2),
432 },
433 }
434 }
435}
436
437enum TaskKind {
441 Default,
443 Blocking,
445}
446
447impl Runtime {
448 pub fn on_shutdown_signal(&self) -> &Shutdown {
450 &self.0.on_shutdown
451 }
452
453 fn spawn_on_rt<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
455 where
456 F: Future<Output = ()> + Send + 'static,
457 {
458 match task_kind {
459 TaskKind::Default => self.0.handle.spawn(fut),
460 TaskKind::Blocking => {
461 let handle = self.0.handle.clone();
462 self.0.handle.spawn_blocking(move || handle.block_on(fut))
463 }
464 }
465 }
466
467 fn spawn_task_as<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
469 where
470 F: Future<Output = ()> + Send + 'static,
471 {
472 match task_kind {
473 TaskKind::Default => self.0.metrics.inc_regular_tasks(),
474 TaskKind::Blocking => self.0.metrics.inc_regular_blocking_tasks(),
475 }
476 let on_shutdown = self.0.on_shutdown.clone();
477
478 let finished_counter = match task_kind {
479 TaskKind::Default => self.0.metrics.finished_regular_tasks_total.clone(),
480 TaskKind::Blocking => self.0.metrics.finished_regular_blocking_tasks_total.clone(),
481 };
482
483 let task = {
484 async move {
485 let _inc_counter_on_drop = IncCounterOnDrop::new(finished_counter);
486 let fut = pin!(fut);
487 let _ = select(on_shutdown, fut).await;
488 }
489 }
490 .in_current_span();
491
492 self.spawn_on_rt(task, task_kind)
493 }
494
495 pub fn spawn_task<F>(&self, fut: F) -> JoinHandle<()>
500 where
501 F: Future<Output = ()> + Send + 'static,
502 {
503 self.spawn_task_as(fut, TaskKind::Default)
504 }
505
506 pub fn spawn_blocking_task<F>(&self, fut: F) -> JoinHandle<()>
511 where
512 F: Future<Output = ()> + Send + 'static,
513 {
514 self.spawn_task_as(fut, TaskKind::Blocking)
515 }
516
517 pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
520 where
521 F: FnOnce() -> R + Send + 'static,
522 R: Send + 'static,
523 {
524 self.0.handle.spawn_blocking(func)
525 }
526
527 pub fn spawn_drop<T: Send + 'static>(&self, value: T) {
533 self.spawn_blocking_named("drop", move || drop(value));
534 }
535
536 pub fn spawn_blocking_named<F, R>(&self, name: &'static str, func: F) -> crate::LazyHandle<R>
548 where
549 F: FnOnce() -> R + Send + 'static,
550 R: Send + 'static,
551 {
552 crate::LazyHandle::new(self.0.worker_map.spawn_on(name, func))
553 }
554
555 pub fn spawn_with_signal<F>(&self, f: impl FnOnce(Shutdown) -> F) -> JoinHandle<()>
560 where
561 F: Future<Output = ()> + Send + 'static,
562 {
563 let on_shutdown = self.0.on_shutdown.clone();
564 let fut = f(on_shutdown);
565 let task = fut.in_current_span();
566 self.0.handle.spawn(task)
567 }
568
569 fn spawn_critical_as<F>(
571 &self,
572 name: &'static str,
573 fut: F,
574 task_kind: TaskKind,
575 ) -> JoinHandle<()>
576 where
577 F: Future<Output = ()> + Send + 'static,
578 {
579 self.0.metrics.inc_critical_tasks();
580 let panicked_tasks_tx = self.0.task_events_tx.clone();
581 let on_shutdown = self.0.on_shutdown.clone();
582
583 let task = std::panic::AssertUnwindSafe(fut)
585 .catch_unwind()
586 .map_err(move |error| {
587 let task_error = PanickedTaskError::new(name, error);
588 error!("{task_error}");
589 let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
590 })
591 .in_current_span();
592
593 let finished_critical_tasks_total_metrics =
594 self.0.metrics.finished_critical_tasks_total.clone();
595 let task = async move {
596 let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_total_metrics);
597 let task = pin!(task);
598 let _ = select(on_shutdown, task).await;
599 };
600
601 self.spawn_on_rt(task, task_kind)
602 }
603
604 pub fn spawn_critical_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
609 where
610 F: Future<Output = ()> + Send + 'static,
611 {
612 self.spawn_critical_as(name, fut, TaskKind::Default)
613 }
614
615 pub fn spawn_critical_blocking_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
620 where
621 F: Future<Output = ()> + Send + 'static,
622 {
623 self.spawn_critical_as(name, fut, TaskKind::Blocking)
624 }
625
626 pub fn spawn_critical_with_graceful_shutdown_signal<F>(
647 &self,
648 name: &'static str,
649 f: impl FnOnce(GracefulShutdown) -> F,
650 ) -> JoinHandle<()>
651 where
652 F: Future<Output = ()> + Send + 'static,
653 {
654 let panicked_tasks_tx = self.0.task_events_tx.clone();
655 let on_shutdown = GracefulShutdown::new(
656 self.0.on_shutdown.clone(),
657 GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
658 );
659 let fut = f(on_shutdown);
660
661 let task = std::panic::AssertUnwindSafe(fut)
663 .catch_unwind()
664 .map_err(move |error| {
665 let task_error = PanickedTaskError::new(name, error);
666 error!("{task_error}");
667 let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
668 })
669 .map(drop)
670 .in_current_span();
671
672 self.0.handle.spawn(task)
673 }
674
675 pub fn spawn_with_graceful_shutdown_signal<F>(
695 &self,
696 f: impl FnOnce(GracefulShutdown) -> F,
697 ) -> JoinHandle<()>
698 where
699 F: Future<Output = ()> + Send + 'static,
700 {
701 let on_shutdown = GracefulShutdown::new(
702 self.0.on_shutdown.clone(),
703 GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
704 );
705 let fut = f(on_shutdown);
706
707 self.0.handle.spawn(fut)
708 }
709
710 pub fn initiate_graceful_shutdown(
714 &self,
715 ) -> Result<GracefulShutdown, tokio::sync::mpsc::error::SendError<()>> {
716 self.0
717 .task_events_tx
718 .send(TaskEvent::GracefulShutdown)
719 .map_err(|_send_error_with_task_event| tokio::sync::mpsc::error::SendError(()))?;
720
721 Ok(GracefulShutdown::new(
722 self.0.on_shutdown.clone(),
723 GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
724 ))
725 }
726
727 pub fn graceful_shutdown(&self) {
729 let _ = self.do_graceful_shutdown(None);
730 }
731
732 pub fn graceful_shutdown_with_timeout(&self, timeout: Duration) -> bool {
737 self.do_graceful_shutdown(Some(timeout))
738 }
739
740 fn do_graceful_shutdown(&self, timeout: Option<Duration>) -> bool {
741 let _ = self.0.task_events_tx.send(TaskEvent::GracefulShutdown);
742 let deadline = timeout.map(|t| Instant::now() + t);
743 while self.0.graceful_tasks.load(Ordering::SeqCst) > 0 {
744 if deadline.is_some_and(|d| Instant::now() > d) {
745 debug!("graceful shutdown timed out");
746 return false;
747 }
748 std::thread::yield_now();
749 }
750 debug!("gracefully shut down");
751 true
752 }
753}
754
755#[derive(Debug, Clone)]
759pub struct RuntimeBuilder {
760 config: RuntimeConfig,
761}
762
763impl RuntimeBuilder {
764 pub const fn new(config: RuntimeConfig) -> Self {
766 Self { config }
767 }
768
769 #[tracing::instrument(name = "RuntimeBuilder::build", level = "debug", skip_all)]
775 pub fn build(self) -> Result<Runtime, RuntimeBuildError> {
776 debug!(?self.config, "Building runtime");
777 let config = self.config;
778
779 let (owned_runtime, handle) = match &config.tokio {
780 TokioConfig::Owned { worker_threads, thread_keep_alive, thread_name } => {
781 let mut builder = tokio::runtime::Builder::new_multi_thread();
782 builder
783 .enable_all()
784 .thread_keep_alive(*thread_keep_alive)
785 .thread_name(*thread_name);
786
787 if let Some(threads) = worker_threads {
788 builder.worker_threads(*threads);
789 }
790
791 let runtime = builder.build()?;
792 let h = runtime.handle().clone();
793 (Some(runtime), h)
794 }
795 TokioConfig::ExistingHandle(h) => (None, h.clone()),
796 };
797
798 let (task_manager, on_shutdown, task_events_tx, graceful_tasks) =
799 TaskManager::new_parts(handle.clone());
800
801 #[cfg(feature = "rayon")]
802 let (
803 cpu_pool,
804 rpc_pool,
805 storage_pool,
806 blocking_guard,
807 proof_storage_worker_pool,
808 proof_account_worker_pool,
809 prewarming_pool,
810 bal_streaming_pool,
811 ) = {
812 let default_threads = config.rayon.default_thread_count();
813 let rpc_threads = config.rayon.rpc_threads.unwrap_or(default_threads);
814
815 let cpu_pool = build_pool_with_panic_handler(
816 rayon::ThreadPoolBuilder::new()
817 .num_threads(default_threads)
818 .thread_name(|i| format!("cpu-{i:02}")),
819 )?;
820
821 let rpc_raw = build_pool_with_panic_handler(
822 rayon::ThreadPoolBuilder::new()
823 .num_threads(rpc_threads)
824 .thread_name(|i| format!("rpc-{i:02}")),
825 )?;
826 let rpc_pool = BlockingTaskPool::new(rpc_raw);
827
828 let storage_threads =
829 config.rayon.storage_threads.unwrap_or(DEFAULT_STORAGE_POOL_THREADS);
830 let storage_pool = build_pool_with_panic_handler(
831 rayon::ThreadPoolBuilder::new()
832 .num_threads(storage_threads)
833 .thread_name(|i| format!("storage-{i:02}")),
834 )?;
835
836 let blocking_guard = BlockingTaskGuard::new(config.rayon.max_blocking_tasks);
837
838 let proof_storage_worker_threads =
839 config.rayon.proof_storage_worker_threads.unwrap_or(default_threads * 2);
840 let proof_storage_worker_pool = WorkerPool::from_builder(
841 rayon::ThreadPoolBuilder::new()
842 .num_threads(proof_storage_worker_threads)
843 .thread_name(|i| format!("proof-strg-{i:02}")),
844 )?;
845
846 let proof_account_worker_threads =
847 config.rayon.proof_account_worker_threads.unwrap_or(default_threads * 2);
848 let proof_account_worker_pool = WorkerPool::from_builder(
849 rayon::ThreadPoolBuilder::new()
850 .num_threads(proof_account_worker_threads)
851 .thread_name(|i| format!("proof-acct-{i:02}")),
852 )?;
853
854 let prewarming_threads = config.rayon.prewarming_threads.unwrap_or(default_threads);
855 let prewarming_pool = WorkerPool::from_builder(
856 rayon::ThreadPoolBuilder::new()
857 .num_threads(prewarming_threads)
858 .thread_name(|i| format!("prewarm-{i:02}")),
859 )?;
860
861 let bal_streaming_threads =
862 config.rayon.bal_streaming_threads.unwrap_or(default_threads);
863 let bal_streaming_pool = LazyWorkerPool::new(bal_streaming_threads, "bal-stream");
864
865 debug!(
866 default_threads,
867 rpc_threads,
868 storage_threads,
869 proof_storage_worker_threads,
870 proof_account_worker_threads,
871 prewarming_threads,
872 bal_streaming_threads,
873 max_blocking_tasks = config.rayon.max_blocking_tasks,
874 "Initialized rayon thread pools and configured lazy BAL streaming pool"
875 );
876
877 (
878 cpu_pool,
879 rpc_pool,
880 storage_pool,
881 blocking_guard,
882 proof_storage_worker_pool,
883 proof_account_worker_pool,
884 prewarming_pool,
885 bal_streaming_pool,
886 )
887 };
888
889 let task_manager_handle = handle.spawn(async move {
890 let result = task_manager.await;
891 if let Err(ref err) = result {
892 debug!("{err}");
893 }
894 result
895 });
896
897 let inner = RuntimeInner {
898 _tokio_runtime: owned_runtime,
899 handle,
900 on_shutdown,
901 task_events_tx,
902 metrics: Default::default(),
903 graceful_tasks,
904 #[cfg(feature = "rayon")]
905 cpu_pool,
906 #[cfg(feature = "rayon")]
907 rpc_pool,
908 #[cfg(feature = "rayon")]
909 storage_pool,
910 #[cfg(feature = "rayon")]
911 blocking_guard,
912 #[cfg(feature = "rayon")]
913 proof_storage_worker_pool,
914 #[cfg(feature = "rayon")]
915 proof_account_worker_pool,
916 #[cfg(feature = "rayon")]
917 prewarming_pool,
918 #[cfg(feature = "rayon")]
919 bal_streaming_pool,
920 worker_map: WorkerMap::new(),
921 task_manager_handle: Mutex::new(Some(task_manager_handle)),
922 };
923
924 Ok(Runtime(Arc::new(inner)))
925 }
926}
927
928#[cfg(test)]
929mod tests {
930 use super::*;
931
932 #[test]
933 fn test_runtime_config_default() {
934 let config = RuntimeConfig::default();
935 assert!(matches!(config.tokio, TokioConfig::Owned { .. }));
936 }
937
938 #[test]
939 fn test_runtime_config_existing_handle() {
940 let rt = TokioRuntime::new().unwrap();
941 let config =
942 Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone()));
943 assert!(matches!(config.tokio, TokioConfig::ExistingHandle(_)));
944 }
945
946 #[cfg(feature = "rayon")]
947 #[test]
948 fn test_rayon_config_thread_count() {
949 let config = RayonConfig::default();
950 let count = config.default_thread_count();
951 assert!(count >= 1);
952 }
953
954 #[test]
955 fn test_runtime_builder() {
956 let rt = TokioRuntime::new().unwrap();
957 let config =
958 Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone()));
959 let runtime = RuntimeBuilder::new(config).build().unwrap();
960 let _ = runtime.handle();
961 }
962
963 #[cfg(feature = "rayon")]
964 #[test]
965 fn test_bal_streaming_pool_is_lazy() {
966 let runtime = Runtime::test();
967
968 assert!(runtime.0.bal_streaming_pool.pool.get().is_none());
969
970 assert_eq!(runtime.bal_streaming_pool().current_num_threads(), 2);
971 assert!(runtime.0.bal_streaming_pool.pool.get().is_some());
972 }
973}