1#[cfg(feature = "rayon")]
10use crate::pool::{build_pool_with_panic_handler, BlockingTaskGuard, BlockingTaskPool, WorkerPool};
11use crate::{
12 metrics::{IncCounterOnDrop, TaskExecutorMetrics},
13 shutdown::{GracefulShutdown, GracefulShutdownGuard, Shutdown},
14 worker_map::WorkerMap,
15 PanickedTaskError, TaskEvent, TaskManager,
16};
17use futures_util::{future::select, Future, FutureExt, TryFutureExt};
18#[cfg(feature = "rayon")]
19use std::{num::NonZeroUsize, thread::available_parallelism};
20use std::{
21 pin::pin,
22 sync::{
23 atomic::{AtomicUsize, Ordering},
24 Arc, Mutex,
25 },
26 thread,
27 time::{Duration, Instant},
28};
29use tokio::{runtime::Handle, sync::mpsc::UnboundedSender, task::JoinHandle};
30use tracing::{debug, error};
31use tracing_futures::Instrument;
32
33use tokio::runtime::Runtime as TokioRuntime;
34
35pub const DEFAULT_THREAD_KEEP_ALIVE: Duration = Duration::from_secs(15);
37
38pub const DEFAULT_RESERVED_CPU_CORES: usize = 2;
40
41pub const DEFAULT_STORAGE_POOL_THREADS: usize = 16;
43
44pub const DEFAULT_MAX_BLOCKING_TASKS: usize = 512;
46
47#[derive(Debug, Clone)]
49pub enum TokioConfig {
50 Owned {
52 worker_threads: Option<usize>,
54 thread_keep_alive: Duration,
56 thread_name: &'static str,
58 },
59 ExistingHandle(Handle),
61}
62
63impl Default for TokioConfig {
64 fn default() -> Self {
65 Self::Owned {
66 worker_threads: None,
67 thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
68 thread_name: "tokio-rt",
69 }
70 }
71}
72
73impl TokioConfig {
74 pub const fn existing_handle(handle: Handle) -> Self {
76 Self::ExistingHandle(handle)
77 }
78
79 pub const fn with_worker_threads(worker_threads: usize) -> Self {
81 Self::Owned {
82 worker_threads: Some(worker_threads),
83 thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
84 thread_name: "tokio-rt",
85 }
86 }
87}
88
89#[derive(Debug, Clone)]
91#[cfg(feature = "rayon")]
92pub struct RayonConfig {
93 pub cpu_threads: Option<usize>,
96 pub reserved_cpu_cores: usize,
98 pub rpc_threads: Option<usize>,
101 pub storage_threads: Option<usize>,
104 pub max_blocking_tasks: usize,
106 pub proof_storage_worker_threads: Option<usize>,
109 pub proof_account_worker_threads: Option<usize>,
112 pub prewarming_threads: Option<usize>,
115 pub bal_streaming_threads: Option<usize>,
118}
119
120#[cfg(feature = "rayon")]
121impl Default for RayonConfig {
122 fn default() -> Self {
123 Self {
124 cpu_threads: None,
125 reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
126 rpc_threads: None,
127 storage_threads: None,
128 max_blocking_tasks: DEFAULT_MAX_BLOCKING_TASKS,
129 proof_storage_worker_threads: None,
130 proof_account_worker_threads: None,
131 prewarming_threads: None,
132 bal_streaming_threads: None,
133 }
134 }
135}
136
137#[cfg(feature = "rayon")]
138impl RayonConfig {
139 pub const fn with_reserved_cpu_cores(mut self, reserved_cpu_cores: usize) -> Self {
141 self.reserved_cpu_cores = reserved_cpu_cores;
142 self
143 }
144
145 pub const fn with_max_blocking_tasks(mut self, max_blocking_tasks: usize) -> Self {
147 self.max_blocking_tasks = max_blocking_tasks;
148 self
149 }
150
151 pub const fn with_rpc_threads(mut self, rpc_threads: usize) -> Self {
153 self.rpc_threads = Some(rpc_threads);
154 self
155 }
156
157 pub const fn with_storage_threads(mut self, storage_threads: usize) -> Self {
159 self.storage_threads = Some(storage_threads);
160 self
161 }
162
163 pub const fn with_proof_storage_worker_threads(
165 mut self,
166 proof_storage_worker_threads: usize,
167 ) -> Self {
168 self.proof_storage_worker_threads = Some(proof_storage_worker_threads);
169 self
170 }
171
172 pub const fn with_proof_account_worker_threads(
174 mut self,
175 proof_account_worker_threads: usize,
176 ) -> Self {
177 self.proof_account_worker_threads = Some(proof_account_worker_threads);
178 self
179 }
180
181 pub const fn with_prewarming_threads(mut self, prewarming_threads: usize) -> Self {
183 self.prewarming_threads = Some(prewarming_threads);
184 self
185 }
186
187 pub const fn with_bal_streaming_threads(mut self, bal_streaming_threads: usize) -> Self {
189 self.bal_streaming_threads = Some(bal_streaming_threads);
190 self
191 }
192
193 fn default_thread_count(&self) -> usize {
195 let _ = self.reserved_cpu_cores;
198 self.cpu_threads.unwrap_or_else(|| available_parallelism().map_or(1, NonZeroUsize::get))
199 }
200}
201
202#[derive(Debug, Clone, Default)]
204pub struct RuntimeConfig {
205 pub tokio: TokioConfig,
207 #[cfg(feature = "rayon")]
209 pub rayon: RayonConfig,
210}
211
212impl RuntimeConfig {
213 pub fn with_tokio(mut self, tokio: TokioConfig) -> Self {
215 self.tokio = tokio;
216 self
217 }
218
219 #[cfg(feature = "rayon")]
221 pub const fn with_rayon(mut self, rayon: RayonConfig) -> Self {
222 self.rayon = rayon;
223 self
224 }
225}
226
227#[derive(Debug, thiserror::Error)]
229pub enum RuntimeBuildError {
230 #[error("Failed to build tokio runtime: {0}")]
232 TokioBuild(#[from] std::io::Error),
233 #[cfg(feature = "rayon")]
235 #[error("Failed to build rayon thread pool: {0}")]
236 RayonBuild(#[from] rayon::ThreadPoolBuildError),
237}
238
239struct RuntimeInner {
242 _tokio_runtime: Option<TokioRuntime>,
244 handle: Handle,
246 on_shutdown: Shutdown,
248 task_events_tx: UnboundedSender<TaskEvent>,
250 metrics: TaskExecutorMetrics,
252 graceful_tasks: Arc<AtomicUsize>,
254 #[cfg(feature = "rayon")]
256 cpu_pool: rayon::ThreadPool,
257 #[cfg(feature = "rayon")]
259 rpc_pool: BlockingTaskPool,
260 #[cfg(feature = "rayon")]
262 storage_pool: rayon::ThreadPool,
263 #[cfg(feature = "rayon")]
265 blocking_guard: BlockingTaskGuard,
266 #[cfg(feature = "rayon")]
268 proof_storage_worker_pool: WorkerPool,
269 #[cfg(feature = "rayon")]
271 proof_account_worker_pool: WorkerPool,
272 #[cfg(feature = "rayon")]
274 prewarming_pool: WorkerPool,
275 #[cfg(feature = "rayon")]
277 bal_streaming_pool: WorkerPool,
278 worker_map: WorkerMap,
281 task_manager_handle: Mutex<Option<JoinHandle<Result<(), PanickedTaskError>>>>,
285}
286
287#[derive(Clone)]
296pub struct Runtime(Arc<RuntimeInner>);
297
298impl std::fmt::Debug for Runtime {
299 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300 f.debug_struct("Runtime").field("handle", &self.0.handle).finish()
301 }
302}
303
304impl Runtime {
307 pub fn take_task_manager_handle(&self) -> Option<JoinHandle<Result<(), PanickedTaskError>>> {
313 self.0.task_manager_handle.lock().unwrap().take()
314 }
315
316 pub fn handle(&self) -> &Handle {
318 &self.0.handle
319 }
320
321 #[cfg(feature = "rayon")]
323 pub fn cpu_pool(&self) -> &rayon::ThreadPool {
324 &self.0.cpu_pool
325 }
326
327 #[cfg(feature = "rayon")]
329 pub fn rpc_pool(&self) -> &BlockingTaskPool {
330 &self.0.rpc_pool
331 }
332
333 #[cfg(feature = "rayon")]
335 pub fn storage_pool(&self) -> &rayon::ThreadPool {
336 &self.0.storage_pool
337 }
338
339 #[cfg(feature = "rayon")]
341 pub fn blocking_guard(&self) -> BlockingTaskGuard {
342 self.0.blocking_guard.clone()
343 }
344
345 #[cfg(feature = "rayon")]
347 pub fn proof_storage_worker_pool(&self) -> &WorkerPool {
348 &self.0.proof_storage_worker_pool
349 }
350
351 #[cfg(feature = "rayon")]
353 pub fn proof_account_worker_pool(&self) -> &WorkerPool {
354 &self.0.proof_account_worker_pool
355 }
356
357 #[cfg(feature = "rayon")]
359 pub fn prewarming_pool(&self) -> &WorkerPool {
360 &self.0.prewarming_pool
361 }
362
363 #[cfg(feature = "rayon")]
365 pub fn bal_streaming_pool(&self) -> &WorkerPool {
366 &self.0.bal_streaming_pool
367 }
368}
369
370impl Runtime {
373 pub fn test() -> Self {
378 let config = match Handle::try_current() {
379 Ok(handle) => Self::test_config().with_tokio(TokioConfig::existing_handle(handle)),
380 Err(_) => Self::test_config(),
381 };
382 RuntimeBuilder::new(config).build().expect("failed to build test Runtime")
383 }
384
385 const fn test_config() -> RuntimeConfig {
386 RuntimeConfig {
387 tokio: TokioConfig::Owned {
388 worker_threads: Some(2),
389 thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
390 thread_name: "tokio-test",
391 },
392 #[cfg(feature = "rayon")]
393 rayon: RayonConfig {
394 cpu_threads: Some(2),
395 reserved_cpu_cores: 0,
396 rpc_threads: Some(2),
397 storage_threads: Some(2),
398 max_blocking_tasks: 16,
399 proof_storage_worker_threads: Some(2),
400 proof_account_worker_threads: Some(2),
401 prewarming_threads: Some(2),
402 bal_streaming_threads: Some(2),
403 },
404 }
405 }
406}
407
408enum TaskKind {
412 Default,
414 Blocking,
416}
417
418impl Runtime {
419 pub fn on_shutdown_signal(&self) -> &Shutdown {
421 &self.0.on_shutdown
422 }
423
424 fn spawn_on_rt<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
426 where
427 F: Future<Output = ()> + Send + 'static,
428 {
429 match task_kind {
430 TaskKind::Default => self.0.handle.spawn(fut),
431 TaskKind::Blocking => {
432 let handle = self.0.handle.clone();
433 self.0.handle.spawn_blocking(move || handle.block_on(fut))
434 }
435 }
436 }
437
438 fn spawn_task_as<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
440 where
441 F: Future<Output = ()> + Send + 'static,
442 {
443 match task_kind {
444 TaskKind::Default => self.0.metrics.inc_regular_tasks(),
445 TaskKind::Blocking => self.0.metrics.inc_regular_blocking_tasks(),
446 }
447 let on_shutdown = self.0.on_shutdown.clone();
448
449 let finished_counter = match task_kind {
450 TaskKind::Default => self.0.metrics.finished_regular_tasks_total.clone(),
451 TaskKind::Blocking => self.0.metrics.finished_regular_blocking_tasks_total.clone(),
452 };
453
454 let task = {
455 async move {
456 let _inc_counter_on_drop = IncCounterOnDrop::new(finished_counter);
457 let fut = pin!(fut);
458 let _ = select(on_shutdown, fut).await;
459 }
460 }
461 .in_current_span();
462
463 self.spawn_on_rt(task, task_kind)
464 }
465
466 pub fn spawn_task<F>(&self, fut: F) -> JoinHandle<()>
471 where
472 F: Future<Output = ()> + Send + 'static,
473 {
474 self.spawn_task_as(fut, TaskKind::Default)
475 }
476
477 pub fn spawn_blocking_task<F>(&self, fut: F) -> JoinHandle<()>
482 where
483 F: Future<Output = ()> + Send + 'static,
484 {
485 self.spawn_task_as(fut, TaskKind::Blocking)
486 }
487
488 pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
491 where
492 F: FnOnce() -> R + Send + 'static,
493 R: Send + 'static,
494 {
495 self.0.handle.spawn_blocking(func)
496 }
497
498 pub fn spawn_drop<T: Send + 'static>(&self, value: T) {
504 self.spawn_blocking_named("drop", move || drop(value));
505 }
506
507 pub fn spawn_blocking_named<F, R>(&self, name: &'static str, func: F) -> crate::LazyHandle<R>
519 where
520 F: FnOnce() -> R + Send + 'static,
521 R: Send + 'static,
522 {
523 crate::LazyHandle::new(self.0.worker_map.spawn_on(name, func))
524 }
525
526 pub fn spawn_with_signal<F>(&self, f: impl FnOnce(Shutdown) -> F) -> JoinHandle<()>
531 where
532 F: Future<Output = ()> + Send + 'static,
533 {
534 let on_shutdown = self.0.on_shutdown.clone();
535 let fut = f(on_shutdown);
536 let task = fut.in_current_span();
537 self.0.handle.spawn(task)
538 }
539
540 fn spawn_critical_as<F>(
542 &self,
543 name: &'static str,
544 fut: F,
545 task_kind: TaskKind,
546 ) -> JoinHandle<()>
547 where
548 F: Future<Output = ()> + Send + 'static,
549 {
550 self.0.metrics.inc_critical_tasks();
551 let panicked_tasks_tx = self.0.task_events_tx.clone();
552 let on_shutdown = self.0.on_shutdown.clone();
553
554 let task = std::panic::AssertUnwindSafe(fut)
556 .catch_unwind()
557 .map_err(move |error| {
558 let task_error = PanickedTaskError::new(name, error);
559 error!("{task_error}");
560 let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
561 })
562 .in_current_span();
563
564 let finished_critical_tasks_total_metrics =
565 self.0.metrics.finished_critical_tasks_total.clone();
566 let task = async move {
567 let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_total_metrics);
568 let task = pin!(task);
569 let _ = select(on_shutdown, task).await;
570 };
571
572 self.spawn_on_rt(task, task_kind)
573 }
574
575 pub fn spawn_critical_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
580 where
581 F: Future<Output = ()> + Send + 'static,
582 {
583 self.spawn_critical_as(name, fut, TaskKind::Default)
584 }
585
586 pub fn spawn_critical_blocking_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
591 where
592 F: Future<Output = ()> + Send + 'static,
593 {
594 self.spawn_critical_as(name, fut, TaskKind::Blocking)
595 }
596
597 pub fn spawn_critical_os_thread<F>(
602 &self,
603 thread_name: &'static str,
604 task_name: &'static str,
605 fut: F,
606 ) -> thread::JoinHandle<()>
607 where
608 F: Future<Output = ()> + Send + 'static,
609 {
610 self.0.metrics.inc_critical_tasks();
611 let handle = self.0.handle.clone();
612 let panicked_tasks_tx = self.0.task_events_tx.clone();
613 let on_shutdown = self.0.on_shutdown.clone();
614
615 let task = std::panic::AssertUnwindSafe(fut)
616 .catch_unwind()
617 .map_err(move |error| {
618 let task_error = PanickedTaskError::new(task_name, error);
619 error!("{task_error}");
620 let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
621 })
622 .in_current_span();
623
624 let finished_critical_tasks_total_metrics =
625 self.0.metrics.finished_critical_tasks_total.clone();
626 let task = async move {
627 let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_total_metrics);
628 let task = pin!(task);
629 let _ = select(on_shutdown, task).await;
630 };
631
632 thread::Builder::new()
633 .name(thread_name.to_string())
634 .spawn(move || {
635 let _guard = handle.enter();
636 handle.block_on(task);
637 })
638 .unwrap_or_else(|e| panic!("failed to spawn critical OS thread {thread_name:?}: {e}"))
639 }
640
641 pub fn spawn_critical_with_graceful_shutdown_signal<F>(
662 &self,
663 name: &'static str,
664 f: impl FnOnce(GracefulShutdown) -> F,
665 ) -> JoinHandle<()>
666 where
667 F: Future<Output = ()> + Send + 'static,
668 {
669 let panicked_tasks_tx = self.0.task_events_tx.clone();
670 let on_shutdown = GracefulShutdown::new(
671 self.0.on_shutdown.clone(),
672 GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
673 );
674 let fut = f(on_shutdown);
675
676 let task = std::panic::AssertUnwindSafe(fut)
678 .catch_unwind()
679 .map_err(move |error| {
680 let task_error = PanickedTaskError::new(name, error);
681 error!("{task_error}");
682 let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
683 })
684 .map(drop)
685 .in_current_span();
686
687 self.0.handle.spawn(task)
688 }
689
690 pub fn spawn_with_graceful_shutdown_signal<F>(
710 &self,
711 f: impl FnOnce(GracefulShutdown) -> F,
712 ) -> JoinHandle<()>
713 where
714 F: Future<Output = ()> + Send + 'static,
715 {
716 let on_shutdown = GracefulShutdown::new(
717 self.0.on_shutdown.clone(),
718 GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
719 );
720 let fut = f(on_shutdown);
721
722 self.0.handle.spawn(fut)
723 }
724
725 pub fn initiate_graceful_shutdown(
729 &self,
730 ) -> Result<GracefulShutdown, tokio::sync::mpsc::error::SendError<()>> {
731 self.0
732 .task_events_tx
733 .send(TaskEvent::GracefulShutdown)
734 .map_err(|_send_error_with_task_event| tokio::sync::mpsc::error::SendError(()))?;
735
736 Ok(GracefulShutdown::new(
737 self.0.on_shutdown.clone(),
738 GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
739 ))
740 }
741
742 pub fn graceful_shutdown(&self) {
744 let _ = self.do_graceful_shutdown(None);
745 }
746
747 pub fn graceful_shutdown_with_timeout(&self, timeout: Duration) -> bool {
752 self.do_graceful_shutdown(Some(timeout))
753 }
754
755 fn do_graceful_shutdown(&self, timeout: Option<Duration>) -> bool {
756 let _ = self.0.task_events_tx.send(TaskEvent::GracefulShutdown);
757 let deadline = timeout.map(|t| Instant::now() + t);
758 while self.0.graceful_tasks.load(Ordering::SeqCst) > 0 {
759 if deadline.is_some_and(|d| Instant::now() > d) {
760 debug!("graceful shutdown timed out");
761 return false;
762 }
763 std::thread::yield_now();
764 }
765 debug!("gracefully shut down");
766 true
767 }
768}
769
770#[derive(Debug, Clone)]
774pub struct RuntimeBuilder {
775 config: RuntimeConfig,
776}
777
778impl RuntimeBuilder {
779 pub const fn new(config: RuntimeConfig) -> Self {
781 Self { config }
782 }
783
784 #[tracing::instrument(name = "RuntimeBuilder::build", level = "debug", skip_all)]
790 pub fn build(self) -> Result<Runtime, RuntimeBuildError> {
791 debug!(?self.config, "Building runtime");
792 let config = self.config;
793
794 let (owned_runtime, handle) = match &config.tokio {
795 TokioConfig::Owned { worker_threads, thread_keep_alive, thread_name } => {
796 let mut builder = tokio::runtime::Builder::new_multi_thread();
797 builder
798 .enable_all()
799 .thread_keep_alive(*thread_keep_alive)
800 .thread_name(*thread_name);
801
802 if let Some(threads) = worker_threads {
803 builder.worker_threads(*threads);
804 }
805
806 let runtime = builder.build()?;
807 let h = runtime.handle().clone();
808 (Some(runtime), h)
809 }
810 TokioConfig::ExistingHandle(h) => (None, h.clone()),
811 };
812
813 let (task_manager, on_shutdown, task_events_tx, graceful_tasks) =
814 TaskManager::new_parts(handle.clone());
815
816 #[cfg(feature = "rayon")]
817 let (
818 cpu_pool,
819 rpc_pool,
820 storage_pool,
821 blocking_guard,
822 proof_storage_worker_pool,
823 proof_account_worker_pool,
824 prewarming_pool,
825 bal_streaming_pool,
826 ) = {
827 let default_threads = config.rayon.default_thread_count();
828 let rpc_threads = config.rayon.rpc_threads.unwrap_or(default_threads);
829
830 let cpu_pool = build_pool_with_panic_handler(
831 rayon::ThreadPoolBuilder::new()
832 .num_threads(default_threads)
833 .thread_name(|i| format!("cpu-{i:02}")),
834 )?;
835
836 let rpc_raw = build_pool_with_panic_handler(
837 rayon::ThreadPoolBuilder::new()
838 .num_threads(rpc_threads)
839 .thread_name(|i| format!("rpc-{i:02}")),
840 )?;
841 let rpc_pool = BlockingTaskPool::new(rpc_raw);
842
843 let storage_threads =
844 config.rayon.storage_threads.unwrap_or(DEFAULT_STORAGE_POOL_THREADS);
845 let storage_pool = build_pool_with_panic_handler(
846 rayon::ThreadPoolBuilder::new()
847 .num_threads(storage_threads)
848 .thread_name(|i| format!("storage-{i:02}")),
849 )?;
850
851 let blocking_guard = BlockingTaskGuard::new(config.rayon.max_blocking_tasks);
852
853 let proof_storage_worker_threads =
854 config.rayon.proof_storage_worker_threads.unwrap_or(default_threads * 2);
855 let proof_storage_worker_pool =
856 WorkerPool::new(proof_storage_worker_threads, "proof-strg");
857
858 let proof_account_worker_threads =
859 config.rayon.proof_account_worker_threads.unwrap_or(default_threads * 2);
860 let proof_account_worker_pool =
861 WorkerPool::new(proof_account_worker_threads, "proof-acct");
862
863 let prewarming_threads = config.rayon.prewarming_threads.unwrap_or(default_threads);
864 let prewarming_pool = WorkerPool::new(prewarming_threads, "prewarm");
865
866 let bal_streaming_threads =
867 config.rayon.bal_streaming_threads.unwrap_or(default_threads);
868 let bal_streaming_pool = WorkerPool::new(bal_streaming_threads, "bal-stream");
869
870 debug!(
871 default_threads,
872 rpc_threads,
873 storage_threads,
874 proof_storage_worker_threads,
875 proof_account_worker_threads,
876 prewarming_threads,
877 bal_streaming_threads,
878 max_blocking_tasks = config.rayon.max_blocking_tasks,
879 "Configured lazy rayon worker pools"
880 );
881
882 (
883 cpu_pool,
884 rpc_pool,
885 storage_pool,
886 blocking_guard,
887 proof_storage_worker_pool,
888 proof_account_worker_pool,
889 prewarming_pool,
890 bal_streaming_pool,
891 )
892 };
893
894 let task_manager_handle = handle.spawn(async move {
895 let result = task_manager.await;
896 if let Err(ref err) = result {
897 debug!("{err}");
898 }
899 result
900 });
901
902 let inner = RuntimeInner {
903 _tokio_runtime: owned_runtime,
904 handle,
905 on_shutdown,
906 task_events_tx,
907 metrics: Default::default(),
908 graceful_tasks,
909 #[cfg(feature = "rayon")]
910 cpu_pool,
911 #[cfg(feature = "rayon")]
912 rpc_pool,
913 #[cfg(feature = "rayon")]
914 storage_pool,
915 #[cfg(feature = "rayon")]
916 blocking_guard,
917 #[cfg(feature = "rayon")]
918 proof_storage_worker_pool,
919 #[cfg(feature = "rayon")]
920 proof_account_worker_pool,
921 #[cfg(feature = "rayon")]
922 prewarming_pool,
923 #[cfg(feature = "rayon")]
924 bal_streaming_pool,
925 worker_map: WorkerMap::new(),
926 task_manager_handle: Mutex::new(Some(task_manager_handle)),
927 };
928
929 Ok(Runtime(Arc::new(inner)))
930 }
931}
932
933#[cfg(test)]
934mod tests {
935 use super::*;
936
937 #[test]
938 fn test_runtime_config_default() {
939 let config = RuntimeConfig::default();
940 assert!(matches!(config.tokio, TokioConfig::Owned { .. }));
941 }
942
943 #[test]
944 fn test_runtime_config_existing_handle() {
945 let rt = TokioRuntime::new().unwrap();
946 let config =
947 Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone()));
948 assert!(matches!(config.tokio, TokioConfig::ExistingHandle(_)));
949 }
950
951 #[cfg(feature = "rayon")]
952 #[test]
953 fn test_rayon_config_thread_count() {
954 let config = RayonConfig::default();
955 let count = config.default_thread_count();
956 assert!(count >= 1);
957 }
958
959 #[test]
960 fn test_runtime_builder() {
961 let rt = TokioRuntime::new().unwrap();
962 let config =
963 Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone()));
964 let runtime = RuntimeBuilder::new(config).build().unwrap();
965 let _ = runtime.handle();
966 }
967
968 #[test]
969 fn critical_os_thread_uses_requested_name() {
970 let runtime = Runtime::test();
971 let (tx, rx) = std::sync::mpsc::channel();
972
973 let handle = runtime.spawn_critical_os_thread(
974 "critical-os-test",
975 "critical os thread test",
976 async move {
977 let name = thread::current().name().unwrap().to_string();
978 tx.send(name).unwrap();
979 },
980 );
981
982 let name = rx.recv_timeout(Duration::from_secs(5)).unwrap();
983 assert_eq!(name, "critical-os-test");
984 handle.join().unwrap();
985 }
986
987 #[test]
988 fn critical_os_thread_panic_is_reported() {
989 let runtime = Runtime::test();
990 let manager_handle = runtime.take_task_manager_handle().unwrap();
991
992 let handle = runtime.spawn_critical_os_thread(
993 "critical-os-panic",
994 "critical os thread panic test",
995 async { panic!("critical os thread panic") },
996 );
997
998 let err =
999 runtime.handle().block_on(async move { manager_handle.await.unwrap().unwrap_err() });
1000 assert_eq!(err.task_name, "critical os thread panic test");
1001 assert_eq!(err.error, Some("critical os thread panic".to_string()));
1002 handle.join().unwrap();
1003 }
1004
1005 #[cfg(feature = "rayon")]
1006 #[test]
1007 fn test_worker_pools_are_lazy() {
1008 let runtime = Runtime::test();
1009
1010 assert!(!runtime.0.bal_streaming_pool.is_initialized());
1012 assert!(!runtime.0.proof_storage_worker_pool.is_initialized());
1013
1014 assert_eq!(runtime.bal_streaming_pool().current_num_threads(), 2);
1016 assert!(runtime.0.bal_streaming_pool.is_initialized());
1017
1018 assert_eq!(runtime.proof_storage_worker_pool().current_num_threads(), 2);
1019 assert_eq!(runtime.proof_account_worker_pool().current_num_threads(), 2);
1020 assert_eq!(runtime.prewarming_pool().current_num_threads(), 2);
1021 }
1022}