Skip to main content

reth_tasks/
lazy.rs

1//! A lazily-resolved handle to a value computed on a background thread.
2
3use std::sync::{Arc, OnceLock};
4use tokio::sync::oneshot;
5
6/// Handle to a value computed on a background thread.
7///
8/// The computation is spawned immediately on creation and runs concurrently.
9/// The result is resolved on first access via [`Self::get`] and cached in a
10/// [`OnceLock`] for subsequent calls.
11///
12/// This type is cheaply cloneable via internal [`Arc`].
13///
14/// Create via [`Runtime::spawn_blocking_named`](crate::Runtime::spawn_blocking_named).
15#[derive(Clone)]
16pub struct LazyHandle<T> {
17    inner: Arc<LazyHandleInner<T>>,
18}
19
20struct LazyHandleInner<T> {
21    /// Pending receiver, taken on first access.
22    rx: std::sync::Mutex<Option<oneshot::Receiver<T>>>,
23    /// Cached result after the first successful receive.
24    value: OnceLock<T>,
25}
26
27impl<T: Send + 'static> LazyHandle<T> {
28    /// Creates a new handle from a background task receiver.
29    pub(crate) fn new(rx: oneshot::Receiver<T>) -> Self {
30        Self {
31            inner: Arc::new(LazyHandleInner {
32                rx: std::sync::Mutex::new(Some(rx)),
33                value: OnceLock::new(),
34            }),
35        }
36    }
37
38    /// Creates a handle that is already resolved with the given value.
39    pub fn ready(value: T) -> Self {
40        let inner =
41            LazyHandleInner { rx: std::sync::Mutex::new(None), value: OnceLock::from(value) };
42        Self { inner: Arc::new(inner) }
43    }
44
45    /// Blocks until the background task completes and returns a reference to the result.
46    ///
47    /// On the first call this awaits the receiver; subsequent calls return the cached value
48    /// without blocking.
49    ///
50    /// # Panics
51    ///
52    /// Panics if the background task was dropped without producing a value.
53    pub fn get(&self) -> &T {
54        self.inner.value.get_or_init(|| {
55            let rx = self
56                .inner
57                .rx
58                .lock()
59                .expect("lock poisoned")
60                .take()
61                .expect("LazyHandle receiver already taken without value being set");
62            rx.blocking_recv().expect("LazyHandle task dropped without producing a value")
63        })
64    }
65
66    /// Consumes the handle and returns the inner value if this is the only handle.
67    ///
68    /// Returns `Err(self)` if other clones exist, so the caller can fall back
69    /// to a reference-based path (e.g. `clone_into_sorted` instead of `into_sorted`).
70    ///
71    /// Blocks if the background task hasn't completed yet.
72    pub fn try_into_inner(self) -> Result<T, Self> {
73        self.get();
74        match Arc::try_unwrap(self.inner) {
75            Ok(inner) => Ok(inner.value.into_inner().expect("value was just set by get()")),
76            Err(arc) => Err(Self { inner: arc }),
77        }
78    }
79}
80
81impl<T: Send + std::fmt::Debug + 'static> std::fmt::Debug for LazyHandle<T> {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        let mut s = f.debug_struct("LazyHandle");
84        if let Some(value) = self.inner.value.get() {
85            s.field("value", value);
86        } else {
87            s.field("value", &"<pending>");
88        }
89        s.finish()
90    }
91}
92
93#[cfg(test)]
94mod tests {
95    use super::*;
96
97    #[test]
98    fn test_lazy_handle_resolves() {
99        let (tx, rx) = oneshot::channel();
100        let handle = LazyHandle::new(rx);
101        tx.send(42u64).unwrap();
102        assert_eq!(*handle.get(), 42);
103        // subsequent calls return cached value
104        assert_eq!(*handle.get(), 42);
105    }
106
107    #[test]
108    fn test_lazy_handle_clone_shares_value() {
109        let (tx, rx) = oneshot::channel();
110        let handle = LazyHandle::new(rx);
111        let handle2 = handle.clone();
112        tx.send(99u64).unwrap();
113        assert_eq!(*handle.get(), 99);
114        assert_eq!(*handle2.get(), 99);
115    }
116
117    #[test]
118    fn test_lazy_handle_try_into_inner() {
119        let (tx, rx) = oneshot::channel();
120        let handle = LazyHandle::new(rx);
121        tx.send(String::from("hello")).unwrap();
122        assert_eq!(handle.try_into_inner().unwrap(), "hello");
123    }
124
125    #[test]
126    fn test_lazy_handle_try_into_inner_returns_self_with_clone() {
127        let (tx, rx) = oneshot::channel();
128        let handle = LazyHandle::new(rx);
129        let _clone = handle.clone();
130        tx.send(String::from("hello")).unwrap();
131        let handle = handle.try_into_inner().unwrap_err();
132        assert_eq!(*handle.get(), "hello");
133    }
134
135    #[test]
136    #[should_panic(expected = "LazyHandle task dropped without producing a value")]
137    fn test_lazy_handle_panics_on_dropped_sender() {
138        let (_tx, rx) = oneshot::channel::<u64>();
139        let handle = LazyHandle::new(rx);
140        drop(_tx);
141        handle.get();
142    }
143}