reth_db/
lockfile.rs

1//! Storage lock utils.
2
3#![cfg_attr(feature = "disable-lock", allow(dead_code))]
4
5use reth_storage_errors::lockfile::StorageLockError;
6use std::{
7    path::{Path, PathBuf},
8    process,
9    sync::{Arc, OnceLock},
10};
11use sysinfo::{ProcessRefreshKind, RefreshKind, System};
12
13/// File lock name.
14const LOCKFILE_NAME: &str = "lock";
15
16/// A file lock for a storage directory to ensure exclusive read-write access across different
17/// processes.
18///
19/// This lock stores the PID of the process holding it and is released (deleted) on a graceful
20/// shutdown. On resuming from a crash, the stored PID helps verify that no other process holds the
21/// lock.
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct StorageLock(Arc<StorageLockInner>);
24
25impl StorageLock {
26    /// Tries to acquire a write lock on the target directory, returning [`StorageLockError`] if
27    /// unsuccessful.
28    ///
29    /// Note: In-process exclusivity is not on scope. If called from the same process (or another
30    /// with the same PID), it will succeed.
31    pub fn try_acquire(path: &Path) -> Result<Self, StorageLockError> {
32        #[cfg(feature = "disable-lock")]
33        {
34            let file_path = path.join(LOCKFILE_NAME);
35            // Too expensive for ef-tests to write/read lock to/from disk.
36            Ok(Self(Arc::new(StorageLockInner { file_path })))
37        }
38
39        #[cfg(not(feature = "disable-lock"))]
40        Self::try_acquire_file_lock(path)
41    }
42
43    /// Acquire a file write lock.
44    #[cfg(any(test, not(feature = "disable-lock")))]
45    fn try_acquire_file_lock(path: &Path) -> Result<Self, StorageLockError> {
46        let file_path = path.join(LOCKFILE_NAME);
47        if let Some(process_lock) = ProcessUID::parse(&file_path)? {
48            if process_lock.pid != (process::id() as usize) && process_lock.is_active() {
49                reth_tracing::tracing::error!(
50                    target: "reth::db::lockfile",
51                    path = ?file_path,
52                    pid = process_lock.pid,
53                    start_time = process_lock.start_time,
54                    "Storage lock already taken."
55                );
56                return Err(StorageLockError::Taken(process_lock.pid))
57            }
58        }
59
60        Ok(Self(Arc::new(StorageLockInner::new(file_path)?)))
61    }
62}
63
64impl Drop for StorageLockInner {
65    fn drop(&mut self) {
66        // The lockfile is not created in disable-lock mode, so we don't need to delete it.
67        #[cfg(any(test, not(feature = "disable-lock")))]
68        {
69            let file_path = &self.file_path;
70            if file_path.exists() {
71                if let Ok(Some(process_uid)) = ProcessUID::parse(file_path) {
72                    // Only remove if the lock file belongs to our process
73                    if process_uid.pid == process::id() as usize {
74                        if let Err(err) = reth_fs_util::remove_file(file_path) {
75                            reth_tracing::tracing::error!(%err, "Failed to delete lock file");
76                        }
77                    } else {
78                        reth_tracing::tracing::warn!(
79                            "Lock file belongs to different process (PID: {}), not removing",
80                            process_uid.pid
81                        );
82                    }
83                } else {
84                    // If we can't parse the lock file, still try to remove it
85                    // as it might be corrupted or from a previous run
86                    if let Err(err) = reth_fs_util::remove_file(file_path) {
87                        reth_tracing::tracing::error!(%err, "Failed to delete lock file");
88                    }
89                }
90            }
91        }
92    }
93}
94
95#[derive(Debug, PartialEq, Eq)]
96struct StorageLockInner {
97    file_path: PathBuf,
98}
99
100impl StorageLockInner {
101    /// Creates lock file and writes this process PID into it.
102    fn new(file_path: PathBuf) -> Result<Self, StorageLockError> {
103        // Create the directory if it doesn't exist
104        if let Some(parent) = file_path.parent() {
105            reth_fs_util::create_dir_all(parent).map_err(StorageLockError::other)?;
106        }
107
108        // Write this process unique identifier (pid & start_time) to file
109        ProcessUID::own().write(&file_path)?;
110
111        Ok(Self { file_path })
112    }
113}
114
115#[derive(Clone, Debug)]
116struct ProcessUID {
117    /// OS process identifier
118    pid: usize,
119    /// Process start time
120    start_time: u64,
121}
122
123impl ProcessUID {
124    /// Creates [`Self`] for the provided PID.
125    fn new(pid: usize) -> Option<Self> {
126        let mut system = System::new();
127        let pid2 = sysinfo::Pid::from(pid);
128        system.refresh_processes_specifics(
129            sysinfo::ProcessesToUpdate::Some(&[pid2]),
130            true,
131            ProcessRefreshKind::nothing(),
132        );
133        system.process(pid2).map(|process| Self { pid, start_time: process.start_time() })
134    }
135
136    /// Creates [`Self`] from own process.
137    fn own() -> Self {
138        static CACHE: OnceLock<ProcessUID> = OnceLock::new();
139        CACHE.get_or_init(|| Self::new(process::id() as usize).expect("own process")).clone()
140    }
141
142    /// Parses [`Self`] from a file.
143    fn parse(path: &Path) -> Result<Option<Self>, StorageLockError> {
144        if path.exists() {
145            if let Ok(contents) = reth_fs_util::read_to_string(path) {
146                let mut lines = contents.lines();
147                if let (Some(Ok(pid)), Some(Ok(start_time))) = (
148                    lines.next().map(str::trim).map(str::parse),
149                    lines.next().map(str::trim).map(str::parse),
150                ) {
151                    return Ok(Some(Self { pid, start_time }));
152                }
153            }
154        }
155        Ok(None)
156    }
157
158    /// Whether a process with this `pid` and `start_time` exists.
159    fn is_active(&self) -> bool {
160        System::new_with_specifics(
161            RefreshKind::nothing().with_processes(ProcessRefreshKind::nothing()),
162        )
163        .process(self.pid.into())
164        .is_some_and(|p| p.start_time() == self.start_time)
165    }
166
167    /// Writes `pid` and `start_time` to a file.
168    fn write(&self, path: &Path) -> Result<(), StorageLockError> {
169        reth_fs_util::write(path, format!("{}\n{}", self.pid, self.start_time))
170            .map_err(StorageLockError::other)
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177    use std::sync::{Mutex, MutexGuard, OnceLock};
178
179    // helper to ensure some tests are run serially
180    static SERIAL: OnceLock<Mutex<()>> = OnceLock::new();
181
182    fn serial_lock() -> MutexGuard<'static, ()> {
183        SERIAL.get_or_init(|| Mutex::new(())).lock().unwrap()
184    }
185
186    #[test]
187    fn test_lock() {
188        let _guard = serial_lock();
189
190        let temp_dir = tempfile::tempdir().unwrap();
191
192        let lock = StorageLock::try_acquire_file_lock(temp_dir.path()).unwrap();
193
194        // Same process can re-acquire the lock
195        assert_eq!(Ok(lock.clone()), StorageLock::try_acquire_file_lock(temp_dir.path()));
196
197        // A lock of a non existent PID can be acquired.
198        let lock_file = temp_dir.path().join(LOCKFILE_NAME);
199        let mut fake_pid = 1337;
200        let system = System::new_all();
201        while system.process(fake_pid.into()).is_some() {
202            fake_pid += 1;
203        }
204        ProcessUID { pid: fake_pid, start_time: u64::MAX }.write(&lock_file).unwrap();
205        assert_eq!(Ok(lock.clone()), StorageLock::try_acquire_file_lock(temp_dir.path()));
206
207        let mut pid_1 = ProcessUID::new(1).unwrap();
208
209        // If a parsed `ProcessUID` exists, the lock can NOT be acquired.
210        pid_1.write(&lock_file).unwrap();
211        assert_eq!(
212            Err(StorageLockError::Taken(1)),
213            StorageLock::try_acquire_file_lock(temp_dir.path())
214        );
215
216        // A lock of a different but existing PID can be acquired ONLY IF the start_time differs.
217        pid_1.start_time += 1;
218        pid_1.write(&lock_file).unwrap();
219        assert_eq!(Ok(lock), StorageLock::try_acquire_file_lock(temp_dir.path()));
220    }
221
222    #[test]
223    fn test_drop_lock() {
224        let _guard = serial_lock();
225
226        let temp_dir = tempfile::tempdir().unwrap();
227        let lock_file = temp_dir.path().join(LOCKFILE_NAME);
228
229        let lock = StorageLock::try_acquire_file_lock(temp_dir.path()).unwrap();
230
231        assert!(lock_file.exists());
232        drop(lock);
233        assert!(!lock_file.exists());
234    }
235}