reth_db/
lockfile.rs
1#![cfg_attr(feature = "disable-lock", allow(dead_code))]
4
5use reth_storage_errors::lockfile::StorageLockError;
6use std::{
7 path::{Path, PathBuf},
8 process,
9 sync::{Arc, OnceLock},
10};
11use sysinfo::{ProcessRefreshKind, RefreshKind, System};
12
13const LOCKFILE_NAME: &str = "lock";
15
16#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct StorageLock(Arc<StorageLockInner>);
24
25impl StorageLock {
26 pub fn try_acquire(path: &Path) -> Result<Self, StorageLockError> {
32 #[cfg(feature = "disable-lock")]
33 {
34 let file_path = path.join(LOCKFILE_NAME);
35 Ok(Self(Arc::new(StorageLockInner { file_path })))
37 }
38
39 #[cfg(not(feature = "disable-lock"))]
40 Self::try_acquire_file_lock(path)
41 }
42
43 #[cfg(any(test, not(feature = "disable-lock")))]
45 fn try_acquire_file_lock(path: &Path) -> Result<Self, StorageLockError> {
46 let file_path = path.join(LOCKFILE_NAME);
47 if let Some(process_lock) = ProcessUID::parse(&file_path)? {
48 if process_lock.pid != (process::id() as usize) && process_lock.is_active() {
49 reth_tracing::tracing::error!(
50 target: "reth::db::lockfile",
51 path = ?file_path,
52 pid = process_lock.pid,
53 start_time = process_lock.start_time,
54 "Storage lock already taken."
55 );
56 return Err(StorageLockError::Taken(process_lock.pid))
57 }
58 }
59
60 Ok(Self(Arc::new(StorageLockInner::new(file_path)?)))
61 }
62}
63
64impl Drop for StorageLock {
65 fn drop(&mut self) {
66 #[cfg(any(test, not(feature = "disable-lock")))]
68 if Arc::strong_count(&self.0) == 1 && self.0.file_path.exists() {
69 if let Err(err) = reth_fs_util::remove_file(&self.0.file_path) {
73 reth_tracing::tracing::error!(%err, "Failed to delete lock file");
74 }
75 }
76 }
77}
78
79#[derive(Debug, PartialEq, Eq)]
80struct StorageLockInner {
81 file_path: PathBuf,
82}
83
84impl StorageLockInner {
85 fn new(file_path: PathBuf) -> Result<Self, StorageLockError> {
87 if let Some(parent) = file_path.parent() {
89 reth_fs_util::create_dir_all(parent).map_err(StorageLockError::other)?;
90 }
91
92 ProcessUID::own().write(&file_path)?;
94
95 Ok(Self { file_path })
96 }
97}
98
99#[derive(Clone, Debug)]
100struct ProcessUID {
101 pid: usize,
103 start_time: u64,
105}
106
107impl ProcessUID {
108 fn new(pid: usize) -> Option<Self> {
110 let mut system = System::new();
111 let pid2 = sysinfo::Pid::from(pid);
112 system.refresh_processes_specifics(
113 sysinfo::ProcessesToUpdate::Some(&[pid2]),
114 true,
115 ProcessRefreshKind::nothing(),
116 );
117 system.process(pid2).map(|process| Self { pid, start_time: process.start_time() })
118 }
119
120 fn own() -> Self {
122 static CACHE: OnceLock<ProcessUID> = OnceLock::new();
123 CACHE.get_or_init(|| Self::new(process::id() as usize).expect("own process")).clone()
124 }
125
126 fn parse(path: &Path) -> Result<Option<Self>, StorageLockError> {
128 if path.exists() {
129 if let Ok(contents) = reth_fs_util::read_to_string(path) {
130 let mut lines = contents.lines();
131 if let (Some(Ok(pid)), Some(Ok(start_time))) = (
132 lines.next().map(str::trim).map(str::parse),
133 lines.next().map(str::trim).map(str::parse),
134 ) {
135 return Ok(Some(Self { pid, start_time }));
136 }
137 }
138 }
139 Ok(None)
140 }
141
142 fn is_active(&self) -> bool {
144 System::new_with_specifics(
145 RefreshKind::nothing().with_processes(ProcessRefreshKind::nothing()),
146 )
147 .process(self.pid.into())
148 .is_some_and(|p| p.start_time() == self.start_time)
149 }
150
151 fn write(&self, path: &Path) -> Result<(), StorageLockError> {
153 reth_fs_util::write(path, format!("{}\n{}", self.pid, self.start_time))
154 .map_err(StorageLockError::other)
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161 use std::sync::{Mutex, MutexGuard, OnceLock};
162
163 static SERIAL: OnceLock<Mutex<()>> = OnceLock::new();
165
166 fn serial_lock() -> MutexGuard<'static, ()> {
167 SERIAL.get_or_init(|| Mutex::new(())).lock().unwrap()
168 }
169
170 #[test]
171 fn test_lock() {
172 let _guard = serial_lock();
173
174 let temp_dir = tempfile::tempdir().unwrap();
175
176 let lock = StorageLock::try_acquire_file_lock(temp_dir.path()).unwrap();
177
178 assert_eq!(Ok(lock.clone()), StorageLock::try_acquire_file_lock(temp_dir.path()));
180
181 let lock_file = temp_dir.path().join(LOCKFILE_NAME);
183 let mut fake_pid = 1337;
184 let system = System::new_all();
185 while system.process(fake_pid.into()).is_some() {
186 fake_pid += 1;
187 }
188 ProcessUID { pid: fake_pid, start_time: u64::MAX }.write(&lock_file).unwrap();
189 assert_eq!(Ok(lock.clone()), StorageLock::try_acquire_file_lock(temp_dir.path()));
190
191 let mut pid_1 = ProcessUID::new(1).unwrap();
192
193 pid_1.write(&lock_file).unwrap();
195 assert_eq!(
196 Err(StorageLockError::Taken(1)),
197 StorageLock::try_acquire_file_lock(temp_dir.path())
198 );
199
200 pid_1.start_time += 1;
202 pid_1.write(&lock_file).unwrap();
203 assert_eq!(Ok(lock), StorageLock::try_acquire_file_lock(temp_dir.path()));
204 }
205
206 #[test]
207 fn test_drop_lock() {
208 let _guard = serial_lock();
209
210 let temp_dir = tempfile::tempdir().unwrap();
211 let lock_file = temp_dir.path().join(LOCKFILE_NAME);
212
213 let lock = StorageLock::try_acquire_file_lock(temp_dir.path()).unwrap();
214
215 assert!(lock_file.exists());
216 drop(lock);
217 assert!(!lock_file.exists());
218 }
219}