1#[cfg(feature = "rayon")]
10use crate::pool::{build_pool_with_panic_handler, BlockingTaskGuard, BlockingTaskPool, WorkerPool};
11use crate::{
12 metrics::{IncCounterOnDrop, TaskExecutorMetrics},
13 shutdown::{GracefulShutdown, GracefulShutdownGuard, Shutdown},
14 worker_map::WorkerMap,
15 PanickedTaskError, TaskEvent, TaskManager,
16};
17use futures_util::{future::select, Future, FutureExt, TryFutureExt};
18#[cfg(feature = "rayon")]
19use std::{num::NonZeroUsize, thread::available_parallelism};
20use std::{
21 pin::pin,
22 sync::{
23 atomic::{AtomicUsize, Ordering},
24 Arc, Mutex,
25 },
26 thread,
27 time::{Duration, Instant},
28};
29use tokio::{runtime::Handle, sync::mpsc::UnboundedSender, task::JoinHandle};
30use tracing::{debug, error};
31use tracing_futures::Instrument;
32
33use tokio::runtime::Runtime as TokioRuntime;
34
35pub const DEFAULT_THREAD_KEEP_ALIVE: Duration = Duration::from_secs(15);
37
38pub const DEFAULT_RESERVED_CPU_CORES: usize = 2;
40
41pub const DEFAULT_STORAGE_POOL_THREADS: usize = 16;
43
44#[cfg(feature = "rayon")]
46pub const DEFAULT_STATE_TRIE_OVERLAY_WORKER_THREADS: usize = 4;
47
48pub const DEFAULT_MAX_BLOCKING_TASKS: usize = 512;
50
51#[derive(Debug, Clone)]
53pub enum TokioConfig {
54 Owned {
56 worker_threads: Option<usize>,
58 thread_keep_alive: Duration,
60 thread_name: &'static str,
62 },
63 ExistingHandle(Handle),
65}
66
67impl Default for TokioConfig {
68 fn default() -> Self {
69 Self::Owned {
70 worker_threads: None,
71 thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
72 thread_name: "tokio-rt",
73 }
74 }
75}
76
77impl TokioConfig {
78 pub const fn existing_handle(handle: Handle) -> Self {
80 Self::ExistingHandle(handle)
81 }
82
83 pub const fn with_worker_threads(worker_threads: usize) -> Self {
85 Self::Owned {
86 worker_threads: Some(worker_threads),
87 thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
88 thread_name: "tokio-rt",
89 }
90 }
91}
92
93#[derive(Debug, Clone)]
95#[cfg(feature = "rayon")]
96pub struct RayonConfig {
97 pub cpu_threads: Option<usize>,
100 pub reserved_cpu_cores: usize,
102 pub rpc_threads: Option<usize>,
105 pub storage_threads: Option<usize>,
108 pub max_blocking_tasks: usize,
110 pub proof_storage_worker_threads: Option<usize>,
113 pub proof_account_worker_threads: Option<usize>,
116 pub prewarming_threads: Option<usize>,
119 pub bal_streaming_threads: Option<usize>,
122 pub state_trie_overlay_worker_threads: Option<usize>,
125}
126
127#[cfg(feature = "rayon")]
128impl Default for RayonConfig {
129 fn default() -> Self {
130 Self {
131 cpu_threads: None,
132 reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
133 rpc_threads: None,
134 storage_threads: None,
135 max_blocking_tasks: DEFAULT_MAX_BLOCKING_TASKS,
136 proof_storage_worker_threads: None,
137 proof_account_worker_threads: None,
138 prewarming_threads: None,
139 bal_streaming_threads: None,
140 state_trie_overlay_worker_threads: None,
141 }
142 }
143}
144
145#[cfg(feature = "rayon")]
146impl RayonConfig {
147 pub const fn with_reserved_cpu_cores(mut self, reserved_cpu_cores: usize) -> Self {
149 self.reserved_cpu_cores = reserved_cpu_cores;
150 self
151 }
152
153 pub const fn with_max_blocking_tasks(mut self, max_blocking_tasks: usize) -> Self {
155 self.max_blocking_tasks = max_blocking_tasks;
156 self
157 }
158
159 pub const fn with_rpc_threads(mut self, rpc_threads: usize) -> Self {
161 self.rpc_threads = Some(rpc_threads);
162 self
163 }
164
165 pub const fn with_storage_threads(mut self, storage_threads: usize) -> Self {
167 self.storage_threads = Some(storage_threads);
168 self
169 }
170
171 pub const fn with_proof_storage_worker_threads(
173 mut self,
174 proof_storage_worker_threads: usize,
175 ) -> Self {
176 self.proof_storage_worker_threads = Some(proof_storage_worker_threads);
177 self
178 }
179
180 pub const fn with_proof_account_worker_threads(
182 mut self,
183 proof_account_worker_threads: usize,
184 ) -> Self {
185 self.proof_account_worker_threads = Some(proof_account_worker_threads);
186 self
187 }
188
189 pub const fn with_prewarming_threads(mut self, prewarming_threads: usize) -> Self {
191 self.prewarming_threads = Some(prewarming_threads);
192 self
193 }
194
195 pub const fn with_bal_streaming_threads(mut self, bal_streaming_threads: usize) -> Self {
197 self.bal_streaming_threads = Some(bal_streaming_threads);
198 self
199 }
200
201 pub const fn with_state_trie_overlay_worker_threads(
203 mut self,
204 state_trie_overlay_worker_threads: usize,
205 ) -> Self {
206 self.state_trie_overlay_worker_threads = Some(state_trie_overlay_worker_threads);
207 self
208 }
209
210 fn default_thread_count(&self) -> usize {
212 let _ = self.reserved_cpu_cores;
215 self.cpu_threads.unwrap_or_else(|| available_parallelism().map_or(1, NonZeroUsize::get))
216 }
217}
218
219#[derive(Debug, Clone, Default)]
221pub struct RuntimeConfig {
222 pub tokio: TokioConfig,
224 #[cfg(feature = "rayon")]
226 pub rayon: RayonConfig,
227}
228
229impl RuntimeConfig {
230 pub fn with_tokio(mut self, tokio: TokioConfig) -> Self {
232 self.tokio = tokio;
233 self
234 }
235
236 #[cfg(feature = "rayon")]
238 pub const fn with_rayon(mut self, rayon: RayonConfig) -> Self {
239 self.rayon = rayon;
240 self
241 }
242}
243
244#[derive(Debug, thiserror::Error)]
246pub enum RuntimeBuildError {
247 #[error("Failed to build tokio runtime: {0}")]
249 TokioBuild(#[from] std::io::Error),
250 #[cfg(feature = "rayon")]
252 #[error("Failed to build rayon thread pool: {0}")]
253 RayonBuild(#[from] rayon::ThreadPoolBuildError),
254}
255
256struct RuntimeInner {
259 _tokio_runtime: Option<TokioRuntime>,
261 handle: Handle,
263 on_shutdown: Shutdown,
265 task_events_tx: UnboundedSender<TaskEvent>,
267 metrics: TaskExecutorMetrics,
269 graceful_tasks: Arc<AtomicUsize>,
271 #[cfg(feature = "rayon")]
273 cpu_pool: rayon::ThreadPool,
274 #[cfg(feature = "rayon")]
276 rpc_pool: BlockingTaskPool,
277 #[cfg(feature = "rayon")]
279 storage_pool: rayon::ThreadPool,
280 #[cfg(feature = "rayon")]
282 blocking_guard: BlockingTaskGuard,
283 #[cfg(feature = "rayon")]
285 proof_storage_worker_pool: WorkerPool,
286 #[cfg(feature = "rayon")]
288 proof_account_worker_pool: WorkerPool,
289 #[cfg(feature = "rayon")]
291 prewarming_pool: WorkerPool,
292 #[cfg(feature = "rayon")]
294 bal_streaming_pool: WorkerPool,
295 #[cfg(feature = "rayon")]
297 state_trie_overlay_worker_pool: Arc<WorkerPool>,
298 worker_map: WorkerMap,
301 task_manager_handle: Mutex<Option<JoinHandle<Result<(), PanickedTaskError>>>>,
305}
306
307#[derive(Clone)]
316pub struct Runtime(Arc<RuntimeInner>);
317
318impl std::fmt::Debug for Runtime {
319 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
320 f.debug_struct("Runtime").field("handle", &self.0.handle).finish()
321 }
322}
323
324impl Runtime {
327 pub fn take_task_manager_handle(&self) -> Option<JoinHandle<Result<(), PanickedTaskError>>> {
333 self.0.task_manager_handle.lock().unwrap().take()
334 }
335
336 pub fn handle(&self) -> &Handle {
338 &self.0.handle
339 }
340
341 #[cfg(feature = "rayon")]
343 pub fn cpu_pool(&self) -> &rayon::ThreadPool {
344 &self.0.cpu_pool
345 }
346
347 #[cfg(feature = "rayon")]
349 pub fn rpc_pool(&self) -> &BlockingTaskPool {
350 &self.0.rpc_pool
351 }
352
353 #[cfg(feature = "rayon")]
355 pub fn storage_pool(&self) -> &rayon::ThreadPool {
356 &self.0.storage_pool
357 }
358
359 #[cfg(feature = "rayon")]
361 pub fn blocking_guard(&self) -> BlockingTaskGuard {
362 self.0.blocking_guard.clone()
363 }
364
365 #[cfg(feature = "rayon")]
367 pub fn proof_storage_worker_pool(&self) -> &WorkerPool {
368 &self.0.proof_storage_worker_pool
369 }
370
371 #[cfg(feature = "rayon")]
373 pub fn proof_account_worker_pool(&self) -> &WorkerPool {
374 &self.0.proof_account_worker_pool
375 }
376
377 #[cfg(feature = "rayon")]
379 pub fn prewarming_pool(&self) -> &WorkerPool {
380 &self.0.prewarming_pool
381 }
382
383 #[cfg(feature = "rayon")]
385 pub fn bal_streaming_pool(&self) -> &WorkerPool {
386 &self.0.bal_streaming_pool
387 }
388
389 #[cfg(feature = "rayon")]
391 pub fn state_trie_overlay_worker_pool(&self) -> Arc<WorkerPool> {
392 Arc::clone(&self.0.state_trie_overlay_worker_pool)
393 }
394}
395
396impl Runtime {
399 pub fn test() -> Self {
404 let config = match Handle::try_current() {
405 Ok(handle) => Self::test_config().with_tokio(TokioConfig::existing_handle(handle)),
406 Err(_) => Self::test_config(),
407 };
408 RuntimeBuilder::new(config).build().expect("failed to build test Runtime")
409 }
410
411 const fn test_config() -> RuntimeConfig {
412 RuntimeConfig {
413 tokio: TokioConfig::Owned {
414 worker_threads: Some(2),
415 thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
416 thread_name: "tokio-test",
417 },
418 #[cfg(feature = "rayon")]
419 rayon: RayonConfig {
420 cpu_threads: Some(2),
421 reserved_cpu_cores: 0,
422 rpc_threads: Some(2),
423 storage_threads: Some(2),
424 max_blocking_tasks: 16,
425 proof_storage_worker_threads: Some(2),
426 proof_account_worker_threads: Some(2),
427 prewarming_threads: Some(2),
428 bal_streaming_threads: Some(2),
429 state_trie_overlay_worker_threads: Some(2),
430 },
431 }
432 }
433}
434
435enum TaskKind {
439 Default,
441 Blocking,
443}
444
445impl Runtime {
446 pub fn on_shutdown_signal(&self) -> &Shutdown {
448 &self.0.on_shutdown
449 }
450
451 fn spawn_on_rt<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
453 where
454 F: Future<Output = ()> + Send + 'static,
455 {
456 match task_kind {
457 TaskKind::Default => self.0.handle.spawn(fut),
458 TaskKind::Blocking => {
459 let handle = self.0.handle.clone();
460 self.0.handle.spawn_blocking(move || handle.block_on(fut))
461 }
462 }
463 }
464
465 fn spawn_task_as<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
467 where
468 F: Future<Output = ()> + Send + 'static,
469 {
470 match task_kind {
471 TaskKind::Default => self.0.metrics.inc_regular_tasks(),
472 TaskKind::Blocking => self.0.metrics.inc_regular_blocking_tasks(),
473 }
474 let on_shutdown = self.0.on_shutdown.clone();
475
476 let finished_counter = match task_kind {
477 TaskKind::Default => self.0.metrics.finished_regular_tasks_total.clone(),
478 TaskKind::Blocking => self.0.metrics.finished_regular_blocking_tasks_total.clone(),
479 };
480
481 let task = {
482 async move {
483 let _inc_counter_on_drop = IncCounterOnDrop::new(finished_counter);
484 let fut = pin!(fut);
485 let _ = select(on_shutdown, fut).await;
486 }
487 }
488 .in_current_span();
489
490 self.spawn_on_rt(task, task_kind)
491 }
492
493 pub fn spawn_task<F>(&self, fut: F) -> JoinHandle<()>
498 where
499 F: Future<Output = ()> + Send + 'static,
500 {
501 self.spawn_task_as(fut, TaskKind::Default)
502 }
503
504 pub fn spawn_blocking_task<F>(&self, fut: F) -> JoinHandle<()>
509 where
510 F: Future<Output = ()> + Send + 'static,
511 {
512 self.spawn_task_as(fut, TaskKind::Blocking)
513 }
514
515 pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
518 where
519 F: FnOnce() -> R + Send + 'static,
520 R: Send + 'static,
521 {
522 self.0.handle.spawn_blocking(func)
523 }
524
525 pub fn spawn_drop<T: Send + 'static>(&self, value: T) {
531 self.spawn_blocking_named("drop", move || drop(value));
532 }
533
534 pub fn spawn_blocking_named<F, R>(&self, name: &'static str, func: F) -> crate::LazyHandle<R>
546 where
547 F: FnOnce() -> R + Send + 'static,
548 R: Send + 'static,
549 {
550 crate::LazyHandle::new(self.0.worker_map.spawn_on(name, func))
551 }
552
553 pub fn try_spawn_blocking_named<F, R>(
558 &self,
559 name: &'static str,
560 func: F,
561 ) -> Option<crate::LazyHandle<R>>
562 where
563 F: FnOnce() -> R + Send + 'static,
564 R: Send + 'static,
565 {
566 self.0.worker_map.try_spawn_on(name, func).map(crate::LazyHandle::new)
567 }
568
569 pub fn spawn_blocking_named_or_tokio<F>(&self, name: &'static str, func: F) -> bool
574 where
575 F: FnOnce() + Send + 'static,
576 {
577 let func = Arc::new(parking_lot::Mutex::new(Some(func)));
578 let named_func = func.clone();
579
580 if self
581 .try_spawn_blocking_named(name, move || {
582 if let Some(func) = named_func.lock().take() {
583 func();
584 }
585 })
586 .is_some()
587 {
588 return true
589 }
590
591 if let Some(func) = func.lock().take() {
592 self.spawn_blocking(func);
593 }
594 false
595 }
596
597 pub fn spawn_with_signal<F>(&self, f: impl FnOnce(Shutdown) -> F) -> JoinHandle<()>
602 where
603 F: Future<Output = ()> + Send + 'static,
604 {
605 let on_shutdown = self.0.on_shutdown.clone();
606 let fut = f(on_shutdown);
607 let task = fut.in_current_span();
608 self.0.handle.spawn(task)
609 }
610
611 fn spawn_critical_as<F>(
613 &self,
614 name: &'static str,
615 fut: F,
616 task_kind: TaskKind,
617 ) -> JoinHandle<()>
618 where
619 F: Future<Output = ()> + Send + 'static,
620 {
621 self.0.metrics.inc_critical_tasks();
622 let panicked_tasks_tx = self.0.task_events_tx.clone();
623 let on_shutdown = self.0.on_shutdown.clone();
624
625 let task = std::panic::AssertUnwindSafe(fut)
627 .catch_unwind()
628 .map_err(move |error| {
629 let task_error = PanickedTaskError::new(name, error);
630 error!("{task_error}");
631 let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
632 })
633 .in_current_span();
634
635 let finished_critical_tasks_total_metrics =
636 self.0.metrics.finished_critical_tasks_total.clone();
637 let task = async move {
638 let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_total_metrics);
639 let task = pin!(task);
640 let _ = select(on_shutdown, task).await;
641 };
642
643 self.spawn_on_rt(task, task_kind)
644 }
645
646 pub fn spawn_critical_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
651 where
652 F: Future<Output = ()> + Send + 'static,
653 {
654 self.spawn_critical_as(name, fut, TaskKind::Default)
655 }
656
657 pub fn spawn_critical_blocking_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
662 where
663 F: Future<Output = ()> + Send + 'static,
664 {
665 self.spawn_critical_as(name, fut, TaskKind::Blocking)
666 }
667
668 pub fn spawn_critical_os_thread<F>(
673 &self,
674 thread_name: &'static str,
675 task_name: &'static str,
676 fut: F,
677 ) -> thread::JoinHandle<()>
678 where
679 F: Future<Output = ()> + Send + 'static,
680 {
681 self.0.metrics.inc_critical_tasks();
682 let handle = self.0.handle.clone();
683 let panicked_tasks_tx = self.0.task_events_tx.clone();
684 let on_shutdown = self.0.on_shutdown.clone();
685
686 let task = std::panic::AssertUnwindSafe(fut)
687 .catch_unwind()
688 .map_err(move |error| {
689 let task_error = PanickedTaskError::new(task_name, error);
690 error!("{task_error}");
691 let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
692 })
693 .in_current_span();
694
695 let finished_critical_tasks_total_metrics =
696 self.0.metrics.finished_critical_tasks_total.clone();
697 let task = async move {
698 let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_total_metrics);
699 let task = pin!(task);
700 let _ = select(on_shutdown, task).await;
701 };
702
703 thread::Builder::new()
704 .name(thread_name.to_string())
705 .spawn(move || {
706 let _guard = handle.enter();
707 handle.block_on(task);
708 })
709 .unwrap_or_else(|e| panic!("failed to spawn critical OS thread {thread_name:?}: {e}"))
710 }
711
712 pub fn spawn_critical_with_graceful_shutdown_signal<F>(
733 &self,
734 name: &'static str,
735 f: impl FnOnce(GracefulShutdown) -> F,
736 ) -> JoinHandle<()>
737 where
738 F: Future<Output = ()> + Send + 'static,
739 {
740 let panicked_tasks_tx = self.0.task_events_tx.clone();
741 let on_shutdown = GracefulShutdown::new(
742 self.0.on_shutdown.clone(),
743 GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
744 );
745 let fut = f(on_shutdown);
746
747 let task = std::panic::AssertUnwindSafe(fut)
749 .catch_unwind()
750 .map_err(move |error| {
751 let task_error = PanickedTaskError::new(name, error);
752 error!("{task_error}");
753 let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
754 })
755 .map(drop)
756 .in_current_span();
757
758 self.0.handle.spawn(task)
759 }
760
761 pub fn spawn_with_graceful_shutdown_signal<F>(
781 &self,
782 f: impl FnOnce(GracefulShutdown) -> F,
783 ) -> JoinHandle<()>
784 where
785 F: Future<Output = ()> + Send + 'static,
786 {
787 let on_shutdown = GracefulShutdown::new(
788 self.0.on_shutdown.clone(),
789 GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
790 );
791 let fut = f(on_shutdown);
792
793 self.0.handle.spawn(fut)
794 }
795
796 pub fn initiate_graceful_shutdown(
800 &self,
801 ) -> Result<GracefulShutdown, tokio::sync::mpsc::error::SendError<()>> {
802 self.0
803 .task_events_tx
804 .send(TaskEvent::GracefulShutdown)
805 .map_err(|_send_error_with_task_event| tokio::sync::mpsc::error::SendError(()))?;
806
807 Ok(GracefulShutdown::new(
808 self.0.on_shutdown.clone(),
809 GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
810 ))
811 }
812
813 pub fn graceful_shutdown(&self) {
815 let _ = self.do_graceful_shutdown(None);
816 }
817
818 pub fn graceful_shutdown_with_timeout(&self, timeout: Duration) -> bool {
823 self.do_graceful_shutdown(Some(timeout))
824 }
825
826 fn do_graceful_shutdown(&self, timeout: Option<Duration>) -> bool {
827 let _ = self.0.task_events_tx.send(TaskEvent::GracefulShutdown);
828 let deadline = timeout.map(|t| Instant::now() + t);
829 while self.0.graceful_tasks.load(Ordering::SeqCst) > 0 {
830 if deadline.is_some_and(|d| Instant::now() > d) {
831 debug!("graceful shutdown timed out");
832 return false;
833 }
834 std::thread::yield_now();
835 }
836 debug!("gracefully shut down");
837 true
838 }
839}
840
841#[derive(Debug, Clone)]
845pub struct RuntimeBuilder {
846 config: RuntimeConfig,
847}
848
849impl RuntimeBuilder {
850 pub const fn new(config: RuntimeConfig) -> Self {
852 Self { config }
853 }
854
855 #[tracing::instrument(name = "RuntimeBuilder::build", level = "debug", skip_all)]
861 pub fn build(self) -> Result<Runtime, RuntimeBuildError> {
862 debug!(?self.config, "Building runtime");
863 let config = self.config;
864
865 let (owned_runtime, handle) = match &config.tokio {
866 TokioConfig::Owned { worker_threads, thread_keep_alive, thread_name } => {
867 let mut builder = tokio::runtime::Builder::new_multi_thread();
868 builder
869 .enable_all()
870 .thread_keep_alive(*thread_keep_alive)
871 .thread_name(*thread_name);
872
873 if let Some(threads) = worker_threads {
874 builder.worker_threads(*threads);
875 }
876
877 let runtime = builder.build()?;
878 let h = runtime.handle().clone();
879 (Some(runtime), h)
880 }
881 TokioConfig::ExistingHandle(h) => (None, h.clone()),
882 };
883
884 let (task_manager, on_shutdown, task_events_tx, graceful_tasks) =
885 TaskManager::new_parts(handle.clone());
886
887 #[cfg(feature = "rayon")]
888 let (
889 cpu_pool,
890 rpc_pool,
891 storage_pool,
892 blocking_guard,
893 proof_storage_worker_pool,
894 proof_account_worker_pool,
895 prewarming_pool,
896 bal_streaming_pool,
897 state_trie_overlay_worker_pool,
898 ) = {
899 let default_threads = config.rayon.default_thread_count();
900 let rpc_threads = config.rayon.rpc_threads.unwrap_or(default_threads);
901
902 let cpu_pool = build_pool_with_panic_handler(
903 rayon::ThreadPoolBuilder::new()
904 .num_threads(default_threads)
905 .thread_name(|i| format!("cpu-{i:02}")),
906 )?;
907
908 let rpc_raw = build_pool_with_panic_handler(
909 rayon::ThreadPoolBuilder::new()
910 .num_threads(rpc_threads)
911 .thread_name(|i| format!("rpc-{i:02}")),
912 )?;
913 let rpc_pool = BlockingTaskPool::new(rpc_raw);
914
915 let storage_threads =
916 config.rayon.storage_threads.unwrap_or(DEFAULT_STORAGE_POOL_THREADS);
917 let storage_pool = build_pool_with_panic_handler(
918 rayon::ThreadPoolBuilder::new()
919 .num_threads(storage_threads)
920 .thread_name(|i| format!("storage-{i:02}")),
921 )?;
922
923 let blocking_guard = BlockingTaskGuard::new(config.rayon.max_blocking_tasks);
924
925 let proof_storage_worker_threads =
926 config.rayon.proof_storage_worker_threads.unwrap_or(default_threads * 2);
927 let proof_storage_worker_pool =
928 WorkerPool::new(proof_storage_worker_threads, "proof-strg");
929
930 let proof_account_worker_threads =
931 config.rayon.proof_account_worker_threads.unwrap_or(default_threads * 2);
932 let proof_account_worker_pool =
933 WorkerPool::new(proof_account_worker_threads, "proof-acct");
934
935 let prewarming_threads = config.rayon.prewarming_threads.unwrap_or(default_threads);
936 let prewarming_pool = WorkerPool::new(prewarming_threads, "prewarm");
937
938 let bal_streaming_threads =
939 config.rayon.bal_streaming_threads.unwrap_or(default_threads);
940 let bal_streaming_pool = WorkerPool::new(bal_streaming_threads, "bal-stream");
941
942 let state_trie_overlay_worker_threads = config
943 .rayon
944 .state_trie_overlay_worker_threads
945 .unwrap_or(DEFAULT_STATE_TRIE_OVERLAY_WORKER_THREADS);
946 let state_trie_overlay_worker_pool =
947 Arc::new(WorkerPool::new(state_trie_overlay_worker_threads, "state-ovly"));
948
949 debug!(
950 default_threads,
951 rpc_threads,
952 storage_threads,
953 proof_storage_worker_threads,
954 proof_account_worker_threads,
955 prewarming_threads,
956 bal_streaming_threads,
957 state_trie_overlay_worker_threads,
958 max_blocking_tasks = config.rayon.max_blocking_tasks,
959 "Configured lazy rayon worker pools"
960 );
961
962 (
963 cpu_pool,
964 rpc_pool,
965 storage_pool,
966 blocking_guard,
967 proof_storage_worker_pool,
968 proof_account_worker_pool,
969 prewarming_pool,
970 bal_streaming_pool,
971 state_trie_overlay_worker_pool,
972 )
973 };
974
975 let task_manager_handle = handle.spawn(async move {
976 let result = task_manager.await;
977 if let Err(ref err) = result {
978 debug!("{err}");
979 }
980 result
981 });
982
983 let inner = RuntimeInner {
984 _tokio_runtime: owned_runtime,
985 handle,
986 on_shutdown,
987 task_events_tx,
988 metrics: Default::default(),
989 graceful_tasks,
990 #[cfg(feature = "rayon")]
991 cpu_pool,
992 #[cfg(feature = "rayon")]
993 rpc_pool,
994 #[cfg(feature = "rayon")]
995 storage_pool,
996 #[cfg(feature = "rayon")]
997 blocking_guard,
998 #[cfg(feature = "rayon")]
999 proof_storage_worker_pool,
1000 #[cfg(feature = "rayon")]
1001 proof_account_worker_pool,
1002 #[cfg(feature = "rayon")]
1003 prewarming_pool,
1004 #[cfg(feature = "rayon")]
1005 bal_streaming_pool,
1006 #[cfg(feature = "rayon")]
1007 state_trie_overlay_worker_pool,
1008 worker_map: WorkerMap::new(),
1009 task_manager_handle: Mutex::new(Some(task_manager_handle)),
1010 };
1011
1012 Ok(Runtime(Arc::new(inner)))
1013 }
1014}
1015
1016#[cfg(test)]
1017mod tests {
1018 use super::*;
1019
1020 #[test]
1021 fn test_runtime_config_default() {
1022 let config = RuntimeConfig::default();
1023 assert!(matches!(config.tokio, TokioConfig::Owned { .. }));
1024 }
1025
1026 #[test]
1027 fn test_runtime_config_existing_handle() {
1028 let rt = TokioRuntime::new().unwrap();
1029 let config =
1030 Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone()));
1031 assert!(matches!(config.tokio, TokioConfig::ExistingHandle(_)));
1032 }
1033
1034 #[cfg(feature = "rayon")]
1035 #[test]
1036 fn test_rayon_config_thread_count() {
1037 let config = RayonConfig::default();
1038 let count = config.default_thread_count();
1039 assert!(count >= 1);
1040 }
1041
1042 #[test]
1043 fn test_runtime_builder() {
1044 let rt = TokioRuntime::new().unwrap();
1045 let config =
1046 Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone()));
1047 let runtime = RuntimeBuilder::new(config).build().unwrap();
1048 let _ = runtime.handle();
1049 }
1050
1051 #[test]
1052 fn critical_os_thread_uses_requested_name() {
1053 let runtime = Runtime::test();
1054 let (tx, rx) = std::sync::mpsc::channel();
1055
1056 let handle = runtime.spawn_critical_os_thread(
1057 "critical-os-test",
1058 "critical os thread test",
1059 async move {
1060 let name = thread::current().name().unwrap().to_string();
1061 tx.send(name).unwrap();
1062 },
1063 );
1064
1065 let name = rx.recv_timeout(Duration::from_secs(5)).unwrap();
1066 assert_eq!(name, "critical-os-test");
1067 handle.join().unwrap();
1068 }
1069
1070 #[test]
1071 fn critical_os_thread_panic_is_reported() {
1072 let runtime = Runtime::test();
1073 let manager_handle = runtime.take_task_manager_handle().unwrap();
1074
1075 let handle = runtime.spawn_critical_os_thread(
1076 "critical-os-panic",
1077 "critical os thread panic test",
1078 async { panic!("critical os thread panic") },
1079 );
1080
1081 let err =
1082 runtime.handle().block_on(async move { manager_handle.await.unwrap().unwrap_err() });
1083 assert_eq!(err.task_name, "critical os thread panic test");
1084 assert_eq!(err.error, Some("critical os thread panic".to_string()));
1085 handle.join().unwrap();
1086 }
1087
1088 #[cfg(feature = "rayon")]
1089 #[test]
1090 fn test_worker_pools_are_lazy() {
1091 let runtime = Runtime::test();
1092
1093 assert!(!runtime.0.bal_streaming_pool.is_initialized());
1095 assert!(!runtime.0.proof_storage_worker_pool.is_initialized());
1096 assert!(!runtime.0.state_trie_overlay_worker_pool.is_initialized());
1097
1098 assert_eq!(runtime.bal_streaming_pool().current_num_threads(), 2);
1100 assert!(runtime.0.bal_streaming_pool.is_initialized());
1101
1102 assert_eq!(runtime.proof_storage_worker_pool().current_num_threads(), 2);
1103 assert_eq!(runtime.proof_account_worker_pool().current_num_threads(), 2);
1104 assert_eq!(runtime.prewarming_pool().current_num_threads(), 2);
1105 assert_eq!(runtime.state_trie_overlay_worker_pool().current_num_threads(), 2);
1106 }
1107}