1#![cfg_attr(feature = "disable-lock", allow(dead_code))]
4
5use reth_storage_errors::lockfile::StorageLockError;
6use std::{
7 path::{Path, PathBuf},
8 process,
9 sync::{Arc, OnceLock},
10};
11use sysinfo::{ProcessRefreshKind, RefreshKind, System};
12
13const LOCKFILE_NAME: &str = "lock";
15
16#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct StorageLock(Arc<StorageLockInner>);
24
25impl StorageLock {
26 pub fn try_acquire(path: &Path) -> Result<Self, StorageLockError> {
32 #[cfg(feature = "disable-lock")]
33 {
34 let file_path = path.join(LOCKFILE_NAME);
35 Ok(Self(Arc::new(StorageLockInner { file_path })))
37 }
38
39 #[cfg(not(feature = "disable-lock"))]
40 Self::try_acquire_file_lock(path)
41 }
42
43 #[cfg(any(test, not(feature = "disable-lock")))]
45 fn try_acquire_file_lock(path: &Path) -> Result<Self, StorageLockError> {
46 let file_path = path.join(LOCKFILE_NAME);
47 if let Some(process_lock) = ProcessUID::parse(&file_path)? &&
48 process_lock.pid != (process::id() as usize) &&
49 process_lock.is_active()
50 {
51 reth_tracing::tracing::error!(
52 target: "reth::db::lockfile",
53 path = ?file_path,
54 pid = process_lock.pid,
55 start_time = process_lock.start_time,
56 "Storage lock already taken."
57 );
58 return Err(StorageLockError::Taken(process_lock.pid))
59 }
60
61 Ok(Self(Arc::new(StorageLockInner::new(file_path)?)))
62 }
63}
64
65impl Drop for StorageLockInner {
66 fn drop(&mut self) {
67 #[cfg(any(test, not(feature = "disable-lock")))]
69 {
70 let file_path = &self.file_path;
71 if file_path.exists() {
72 if let Ok(Some(process_uid)) = ProcessUID::parse(file_path) {
73 if process_uid.pid == process::id() as usize {
75 if let Err(err) = reth_fs_util::remove_file(file_path) {
76 reth_tracing::tracing::error!(%err, "Failed to delete lock file");
77 }
78 } else {
79 reth_tracing::tracing::warn!(
80 "Lock file belongs to different process (PID: {}), not removing",
81 process_uid.pid
82 );
83 }
84 } else {
85 if let Err(err) = reth_fs_util::remove_file(file_path) {
88 reth_tracing::tracing::error!(%err, "Failed to delete lock file");
89 }
90 }
91 }
92 }
93 }
94}
95
96#[derive(Debug, PartialEq, Eq)]
97struct StorageLockInner {
98 file_path: PathBuf,
99}
100
101impl StorageLockInner {
102 fn new(file_path: PathBuf) -> Result<Self, StorageLockError> {
104 if let Some(parent) = file_path.parent() {
106 reth_fs_util::create_dir_all(parent).map_err(StorageLockError::other)?;
107 }
108
109 ProcessUID::own().write(&file_path)?;
111
112 Ok(Self { file_path })
113 }
114}
115
116#[derive(Clone, Debug)]
117struct ProcessUID {
118 pid: usize,
120 start_time: u64,
122}
123
124impl ProcessUID {
125 fn new(pid: usize) -> Option<Self> {
127 let mut system = System::new();
128 let pid2 = sysinfo::Pid::from(pid);
129 system.refresh_processes_specifics(
130 sysinfo::ProcessesToUpdate::Some(&[pid2]),
131 true,
132 ProcessRefreshKind::nothing(),
133 );
134 system.process(pid2).map(|process| Self { pid, start_time: process.start_time() })
135 }
136
137 fn own() -> Self {
139 static CACHE: OnceLock<ProcessUID> = OnceLock::new();
140 CACHE.get_or_init(|| Self::new(process::id() as usize).expect("own process")).clone()
141 }
142
143 fn parse(path: &Path) -> Result<Option<Self>, StorageLockError> {
145 if path.exists() &&
146 let Ok(contents) = reth_fs_util::read_to_string(path)
147 {
148 let mut lines = contents.lines();
149 if let (Some(Ok(pid)), Some(Ok(start_time))) = (
150 lines.next().map(str::trim).map(str::parse),
151 lines.next().map(str::trim).map(str::parse),
152 ) {
153 return Ok(Some(Self { pid, start_time }));
154 }
155 }
156 Ok(None)
157 }
158
159 fn is_active(&self) -> bool {
161 System::new_with_specifics(
162 RefreshKind::nothing().with_processes(ProcessRefreshKind::nothing()),
163 )
164 .process(self.pid.into())
165 .is_some_and(|p| p.start_time() == self.start_time)
166 }
167
168 fn write(&self, path: &Path) -> Result<(), StorageLockError> {
170 reth_fs_util::write(path, format!("{}\n{}", self.pid, self.start_time))
171 .map_err(StorageLockError::other)
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178 use std::sync::{Mutex, MutexGuard, OnceLock};
179
180 static SERIAL: OnceLock<Mutex<()>> = OnceLock::new();
182
183 fn serial_lock() -> MutexGuard<'static, ()> {
184 SERIAL.get_or_init(|| Mutex::new(())).lock().unwrap()
185 }
186
187 #[test]
188 fn test_lock() {
189 let _guard = serial_lock();
190
191 let temp_dir = tempfile::tempdir().unwrap();
192
193 let lock = StorageLock::try_acquire_file_lock(temp_dir.path()).unwrap();
194
195 assert_eq!(Ok(lock.clone()), StorageLock::try_acquire_file_lock(temp_dir.path()));
197
198 let lock_file = temp_dir.path().join(LOCKFILE_NAME);
200 let mut fake_pid = 1337;
201 let system = System::new_all();
202 while system.process(fake_pid.into()).is_some() {
203 fake_pid += 1;
204 }
205 ProcessUID { pid: fake_pid, start_time: u64::MAX }.write(&lock_file).unwrap();
206 assert_eq!(Ok(lock.clone()), StorageLock::try_acquire_file_lock(temp_dir.path()));
207
208 let mut pid_1 = ProcessUID::new(1).unwrap();
209
210 pid_1.write(&lock_file).unwrap();
212 assert_eq!(
213 Err(StorageLockError::Taken(1)),
214 StorageLock::try_acquire_file_lock(temp_dir.path())
215 );
216
217 pid_1.start_time += 1;
219 pid_1.write(&lock_file).unwrap();
220 assert_eq!(Ok(lock), StorageLock::try_acquire_file_lock(temp_dir.path()));
221 }
222
223 #[test]
224 fn test_drop_lock() {
225 let _guard = serial_lock();
226
227 let temp_dir = tempfile::tempdir().unwrap();
228 let lock_file = temp_dir.path().join(LOCKFILE_NAME);
229
230 let lock = StorageLock::try_acquire_file_lock(temp_dir.path()).unwrap();
231
232 assert!(lock_file.exists());
233 drop(lock);
234 assert!(!lock_file.exists());
235 }
236}