Skip to main content

reth_tasks/
lazy.rs

1//! A lazily-resolved handle to a value computed on a background thread.
2
3use parking_lot::Mutex;
4use std::sync::{Arc, OnceLock};
5use tokio::sync::oneshot;
6
7/// Handle to a value computed on a background thread.
8///
9/// The computation is spawned immediately on creation and runs concurrently.
10/// The result is resolved on first access via [`Self::get`] and cached in a
11/// [`OnceLock`] for subsequent calls.
12///
13/// This type is cheaply cloneable via internal [`Arc`].
14///
15/// Create via [`Runtime::spawn_blocking_named`](crate::Runtime::spawn_blocking_named).
16#[derive(Clone)]
17pub struct LazyHandle<T> {
18    inner: Arc<LazyHandleInner<T>>,
19}
20
21struct LazyHandleInner<T> {
22    /// Pending receiver, taken on first access.
23    rx: Mutex<Option<oneshot::Receiver<T>>>,
24    /// Cached result after the first successful receive.
25    value: OnceLock<T>,
26}
27
28impl<T: Send + 'static> LazyHandle<T> {
29    /// Creates a new handle from a background task receiver.
30    pub(crate) fn new(rx: oneshot::Receiver<T>) -> Self {
31        Self {
32            inner: Arc::new(LazyHandleInner { rx: Mutex::new(Some(rx)), value: OnceLock::new() }),
33        }
34    }
35
36    /// Creates a handle that is already resolved with the given value.
37    pub fn ready(value: T) -> Self {
38        let inner = LazyHandleInner { rx: Mutex::new(None), value: OnceLock::from(value) };
39        Self { inner: Arc::new(inner) }
40    }
41
42    /// Blocks until the background task completes and returns a reference to the result.
43    ///
44    /// On the first call this awaits the receiver; subsequent calls return the cached value
45    /// without blocking.
46    ///
47    /// # Panics
48    ///
49    /// Panics if the background task was dropped without producing a value.
50    pub fn get(&self) -> &T {
51        self.inner.value.get_or_init(|| {
52            let rx = self
53                .inner
54                .rx
55                .lock()
56                .take()
57                .expect("LazyHandle receiver already taken without value being set");
58            rx.blocking_recv().expect("LazyHandle task dropped without producing a value")
59        })
60    }
61
62    /// Consumes the handle and returns the inner value if this is the only handle.
63    ///
64    /// Returns `Err(self)` if other clones exist, so the caller can fall back
65    /// to a reference-based path (e.g. `clone_into_sorted` instead of `into_sorted`).
66    ///
67    /// Blocks if the background task hasn't completed yet.
68    pub fn try_into_inner(self) -> Result<T, Self> {
69        self.get();
70        match Arc::try_unwrap(self.inner) {
71            Ok(inner) => Ok(inner.value.into_inner().expect("value was just set by get()")),
72            Err(arc) => Err(Self { inner: arc }),
73        }
74    }
75}
76
77impl<T: Send + std::fmt::Debug + 'static> std::fmt::Debug for LazyHandle<T> {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        let mut s = f.debug_struct("LazyHandle");
80        if let Some(value) = self.inner.value.get() {
81            s.field("value", value);
82        } else {
83            s.field("value", &"<pending>");
84        }
85        s.finish()
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92
93    #[test]
94    fn test_lazy_handle_resolves() {
95        let (tx, rx) = oneshot::channel();
96        let handle = LazyHandle::new(rx);
97        tx.send(42u64).unwrap();
98        assert_eq!(*handle.get(), 42);
99        // subsequent calls return cached value
100        assert_eq!(*handle.get(), 42);
101    }
102
103    #[test]
104    fn test_lazy_handle_clone_shares_value() {
105        let (tx, rx) = oneshot::channel();
106        let handle = LazyHandle::new(rx);
107        let handle2 = handle.clone();
108        tx.send(99u64).unwrap();
109        assert_eq!(*handle.get(), 99);
110        assert_eq!(*handle2.get(), 99);
111    }
112
113    #[test]
114    fn test_lazy_handle_try_into_inner() {
115        let (tx, rx) = oneshot::channel();
116        let handle = LazyHandle::new(rx);
117        tx.send(String::from("hello")).unwrap();
118        assert_eq!(handle.try_into_inner().unwrap(), "hello");
119    }
120
121    #[test]
122    fn test_lazy_handle_try_into_inner_returns_self_with_clone() {
123        let (tx, rx) = oneshot::channel();
124        let handle = LazyHandle::new(rx);
125        let _clone = handle.clone();
126        tx.send(String::from("hello")).unwrap();
127        let handle = handle.try_into_inner().unwrap_err();
128        assert_eq!(*handle.get(), "hello");
129    }
130
131    #[test]
132    #[should_panic(expected = "LazyHandle task dropped without producing a value")]
133    fn test_lazy_handle_panics_on_dropped_sender() {
134        let (_tx, rx) = oneshot::channel::<u64>();
135        let handle = LazyHandle::new(rx);
136        drop(_tx);
137        handle.get();
138    }
139}