reth_provider/providers/static_file/
writer.rsuse super::{
manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
};
use crate::providers::static_file::metrics::StaticFileProviderOperation;
use alloy_consensus::BlockHeader;
use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
use reth_codecs::Compact;
use reth_db_api::models::CompactU256;
use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
use reth_node_types::NodePrimitives;
use reth_primitives::{
static_file::{SegmentHeader, SegmentRangeInclusive},
StaticFileSegment,
};
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use std::{
borrow::Borrow,
fmt::Debug,
path::{Path, PathBuf},
sync::{Arc, Weak},
time::Instant,
};
use tracing::debug;
#[derive(Debug)]
pub(crate) struct StaticFileWriters<N> {
headers: RwLock<Option<StaticFileProviderRW<N>>>,
transactions: RwLock<Option<StaticFileProviderRW<N>>>,
receipts: RwLock<Option<StaticFileProviderRW<N>>>,
}
impl<N> Default for StaticFileWriters<N> {
fn default() -> Self {
Self {
headers: Default::default(),
transactions: Default::default(),
receipts: Default::default(),
}
}
}
impl<N: NodePrimitives> StaticFileWriters<N> {
pub(crate) fn get_or_create(
&self,
segment: StaticFileSegment,
create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
let mut write_guard = match segment {
StaticFileSegment::Headers => self.headers.write(),
StaticFileSegment::Transactions => self.transactions.write(),
StaticFileSegment::Receipts => self.receipts.write(),
};
if write_guard.is_none() {
*write_guard = Some(create_fn()?);
}
Ok(StaticFileProviderRWRefMut(write_guard))
}
pub(crate) fn commit(&self) -> ProviderResult<()> {
for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
let mut writer = writer_lock.write();
if let Some(writer) = writer.as_mut() {
writer.commit()?;
}
}
Ok(())
}
}
#[derive(Debug)]
pub struct StaticFileProviderRWRefMut<'a, N>(
pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
);
impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.as_mut().expect("static file writer provider should be init")
}
}
impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
type Target = StaticFileProviderRW<N>;
fn deref(&self) -> &Self::Target {
self.0.as_ref().expect("static file writer provider should be init")
}
}
#[derive(Debug)]
pub struct StaticFileProviderRW<N> {
reader: Weak<StaticFileProviderInner<N>>,
writer: NippyJarWriter<SegmentHeader>,
data_path: PathBuf,
buf: Vec<u8>,
metrics: Option<Arc<StaticFileProviderMetrics>>,
prune_on_commit: Option<(u64, Option<BlockNumber>)>,
}
impl<N: NodePrimitives> StaticFileProviderRW<N> {
pub fn new(
segment: StaticFileSegment,
block: BlockNumber,
reader: Weak<StaticFileProviderInner<N>>,
metrics: Option<Arc<StaticFileProviderMetrics>>,
) -> ProviderResult<Self> {
let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
let mut writer = Self {
writer,
data_path,
buf: Vec::with_capacity(100),
reader,
metrics,
prune_on_commit: None,
};
writer.ensure_end_range_consistency()?;
Ok(writer)
}
fn open(
segment: StaticFileSegment,
block: u64,
reader: Weak<StaticFileProviderInner<N>>,
metrics: Option<Arc<StaticFileProviderMetrics>>,
) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
let start = Instant::now();
let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
let block_range = static_file_provider.find_fixed_range(block);
let (jar, path) = match static_file_provider.get_segment_provider_from_block(
segment,
block_range.start(),
None,
) {
Ok(provider) => (
NippyJar::load(provider.data_path())
.map_err(|e| ProviderError::NippyJar(e.to_string()))?,
provider.data_path().into(),
),
Err(ProviderError::MissingStaticFileBlock(_, _)) => {
let path = static_file_provider.directory().join(segment.filename(&block_range));
(create_jar(segment, &path, block_range), path)
}
Err(err) => return Err(err),
};
let result = match NippyJarWriter::new(jar) {
Ok(writer) => Ok((writer, path)),
Err(NippyJarError::FrozenJar) => {
Err(ProviderError::FinalizedStaticFile(segment, block))
}
Err(e) => Err(ProviderError::NippyJar(e.to_string())),
}?;
if let Some(metrics) = &metrics {
metrics.record_segment_operation(
segment,
StaticFileProviderOperation::OpenWriter,
Some(start.elapsed()),
);
}
Ok(result)
}
fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
let expected_rows = if self.user_header().segment().is_headers() {
self.user_header().block_len().unwrap_or_default()
} else {
self.user_header().tx_len().unwrap_or_default()
};
let pruned_rows = expected_rows - self.writer.rows() as u64;
if pruned_rows > 0 {
self.user_header_mut().prune(pruned_rows);
}
self.writer.commit().map_err(|error| ProviderError::NippyJar(error.to_string()))?;
self.update_index()?;
Ok(())
}
pub fn commit(&mut self) -> ProviderResult<()> {
let start = Instant::now();
if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() {
match self.writer.user_header().segment() {
StaticFileSegment::Headers => self.prune_header_data(to_delete)?,
StaticFileSegment::Transactions => self
.prune_transaction_data(to_delete, last_block_number.expect("should exist"))?,
StaticFileSegment::Receipts => {
self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
}
}
}
if self.writer.is_dirty() {
self.writer.commit().map_err(|e| ProviderError::NippyJar(e.to_string()))?;
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
self.writer.user_header().segment(),
StaticFileProviderOperation::CommitWriter,
Some(start.elapsed()),
);
}
debug!(
target: "provider::static_file",
segment = ?self.writer.user_header().segment(),
path = ?self.data_path,
duration = ?start.elapsed(),
"Commit"
);
self.update_index()?;
}
Ok(())
}
#[cfg(feature = "test-utils")]
pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
let start = Instant::now();
self.writer
.commit_without_sync_all()
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
self.writer.user_header().segment(),
StaticFileProviderOperation::CommitWriter,
Some(start.elapsed()),
);
}
debug!(
target: "provider::static_file",
segment = ?self.writer.user_header().segment(),
path = ?self.data_path,
duration = ?start.elapsed(),
"Commit"
);
self.update_index()?;
Ok(())
}
fn update_index(&self) -> ProviderResult<()> {
let segment_max_block = self
.writer
.user_header()
.block_range()
.as_ref()
.map(|block_range| block_range.end())
.or_else(|| {
(self.writer.user_header().expected_block_start() > 0)
.then(|| self.writer.user_header().expected_block_start() - 1)
});
self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
}
pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
let segment = self.writer.user_header().segment();
self.check_next_block_number(expected_block_number)?;
let start = Instant::now();
if let Some(last_block) = self.writer.user_header().block_end() {
if last_block == self.writer.user_header().expected_block_end() {
self.commit()?;
let (writer, data_path) =
Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
self.writer = writer;
self.data_path = data_path;
*self.writer.user_header_mut() = SegmentHeader::new(
self.reader().find_fixed_range(last_block + 1),
None,
None,
segment,
);
}
}
self.writer.user_header_mut().increment_block();
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
segment,
StaticFileProviderOperation::IncrementBlock,
Some(start.elapsed()),
);
}
Ok(())
}
fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
let next_static_file_block = self
.writer
.user_header()
.block_end()
.map(|b| b + 1)
.unwrap_or_else(|| self.writer.user_header().expected_block_start());
if expected_block_number != next_static_file_block {
return Err(ProviderError::UnexpectedStaticFileBlockNumber(
self.writer.user_header().segment(),
expected_block_number,
next_static_file_block,
))
}
Ok(())
}
fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
let mut remaining_rows = num_rows;
let segment = self.writer.user_header().segment();
while remaining_rows > 0 {
let len = match segment {
StaticFileSegment::Headers => {
self.writer.user_header().block_len().unwrap_or_default()
}
StaticFileSegment::Transactions | StaticFileSegment::Receipts => {
self.writer.user_header().tx_len().unwrap_or_default()
}
};
if remaining_rows >= len {
let block_start = self.writer.user_header().expected_block_start();
if block_start != 0 &&
(segment.is_headers() || last_block.is_some_and(|b| b < block_start))
{
self.delete_current_and_open_previous()?;
} else {
self.writer.user_header_mut().prune(len);
self.writer
.prune_rows(len as usize)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
break
}
remaining_rows -= len;
} else {
self.writer.user_header_mut().prune(remaining_rows);
self.writer
.prune_rows(remaining_rows as usize)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
remaining_rows = 0;
}
}
if let Some(last_block) = last_block {
let mut expected_block_start = self.writer.user_header().expected_block_start();
if num_rows == 0 {
while last_block < expected_block_start {
self.delete_current_and_open_previous()?;
expected_block_start = self.writer.user_header().expected_block_start();
}
}
self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
}
self.commit()?;
Ok(())
}
fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
let current_path = self.data_path.clone();
let (previous_writer, data_path) = Self::open(
self.user_header().segment(),
self.writer.user_header().expected_block_start() - 1,
self.reader.clone(),
self.metrics.clone(),
)?;
self.writer = previous_writer;
self.writer.set_dirty();
self.data_path = data_path;
NippyJar::<SegmentHeader>::load(¤t_path)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?
.delete()
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
Ok(())
}
fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
self.buf.clear();
column.to_compact(&mut self.buf);
self.writer
.append_column(Some(Ok(&self.buf)))
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
Ok(())
}
fn append_with_tx_number<V: Compact>(
&mut self,
tx_num: TxNumber,
value: V,
) -> ProviderResult<()> {
if let Some(range) = self.writer.user_header().tx_range() {
let next_tx = range.end() + 1;
if next_tx != tx_num {
return Err(ProviderError::UnexpectedStaticFileTxNumber(
self.writer.user_header().segment(),
tx_num,
next_tx,
))
}
self.writer.user_header_mut().increment_tx();
} else {
self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
}
self.append_column(value)?;
Ok(())
}
pub fn append_header(
&mut self,
header: &N::BlockHeader,
total_difficulty: U256,
hash: &BlockHash,
) -> ProviderResult<()>
where
N::BlockHeader: Compact,
{
let start = Instant::now();
self.ensure_no_queued_prune()?;
debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
self.increment_block(header.number())?;
self.append_column(header)?;
self.append_column(CompactU256::from(total_difficulty))?;
self.append_column(hash)?;
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
StaticFileSegment::Headers,
StaticFileProviderOperation::Append,
Some(start.elapsed()),
);
}
Ok(())
}
pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
where
N::SignedTx: Compact,
{
let start = Instant::now();
self.ensure_no_queued_prune()?;
debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
self.append_with_tx_number(tx_num, tx)?;
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
StaticFileSegment::Transactions,
StaticFileProviderOperation::Append,
Some(start.elapsed()),
);
}
Ok(())
}
pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
where
N::Receipt: Compact,
{
let start = Instant::now();
self.ensure_no_queued_prune()?;
debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
self.append_with_tx_number(tx_num, receipt)?;
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
StaticFileSegment::Receipts,
StaticFileProviderOperation::Append,
Some(start.elapsed()),
);
}
Ok(())
}
pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<Option<TxNumber>>
where
I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
R: Borrow<N::Receipt>,
N::Receipt: Compact,
{
debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
let mut receipts_iter = receipts.into_iter().peekable();
if receipts_iter.peek().is_none() {
return Ok(None);
}
let start = Instant::now();
self.ensure_no_queued_prune()?;
let mut tx_number = 0;
let mut count: u64 = 0;
for receipt_result in receipts_iter {
let (tx_num, receipt) = receipt_result?;
self.append_with_tx_number(tx_num, receipt.borrow())?;
tx_number = tx_num;
count += 1;
}
if let Some(metrics) = &self.metrics {
metrics.record_segment_operations(
StaticFileSegment::Receipts,
StaticFileProviderOperation::Append,
count,
Some(start.elapsed()),
);
}
Ok(Some(tx_number))
}
pub fn prune_transactions(
&mut self,
to_delete: u64,
last_block: BlockNumber,
) -> ProviderResult<()> {
debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
self.queue_prune(to_delete, Some(last_block))
}
pub fn prune_receipts(
&mut self,
to_delete: u64,
last_block: BlockNumber,
) -> ProviderResult<()> {
debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
self.queue_prune(to_delete, Some(last_block))
}
pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
self.queue_prune(to_delete, None)
}
fn queue_prune(
&mut self,
to_delete: u64,
last_block: Option<BlockNumber>,
) -> ProviderResult<()> {
self.ensure_no_queued_prune()?;
self.prune_on_commit = Some((to_delete, last_block));
Ok(())
}
fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
if self.prune_on_commit.is_some() {
return Err(ProviderError::NippyJar(
"Pruning should be committed before appending or pruning more data".to_string(),
))
}
Ok(())
}
fn prune_transaction_data(
&mut self,
to_delete: u64,
last_block: BlockNumber,
) -> ProviderResult<()> {
let start = Instant::now();
debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
self.truncate(to_delete, Some(last_block))?;
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
StaticFileSegment::Transactions,
StaticFileProviderOperation::Prune,
Some(start.elapsed()),
);
}
Ok(())
}
fn prune_receipt_data(
&mut self,
to_delete: u64,
last_block: BlockNumber,
) -> ProviderResult<()> {
let start = Instant::now();
debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
self.truncate(to_delete, Some(last_block))?;
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
StaticFileSegment::Receipts,
StaticFileProviderOperation::Prune,
Some(start.elapsed()),
);
}
Ok(())
}
fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
let start = Instant::now();
debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
self.truncate(to_delete, None)?;
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
StaticFileSegment::Headers,
StaticFileProviderOperation::Prune,
Some(start.elapsed()),
);
}
Ok(())
}
fn reader(&self) -> StaticFileProvider<N> {
Self::upgrade_provider_to_strong_reference(&self.reader)
}
fn upgrade_provider_to_strong_reference(
provider: &Weak<StaticFileProviderInner<N>>,
) -> StaticFileProvider<N> {
provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
}
pub const fn user_header(&self) -> &SegmentHeader {
self.writer.user_header()
}
pub fn user_header_mut(&mut self) -> &mut SegmentHeader {
self.writer.user_header_mut()
}
#[cfg(any(test, feature = "test-utils"))]
pub fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
}
#[cfg(any(test, feature = "test-utils"))]
pub fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
&mut self.writer
}
}
fn create_jar(
segment: StaticFileSegment,
path: &Path,
expected_block_range: SegmentRangeInclusive,
) -> NippyJar<SegmentHeader> {
let mut jar = NippyJar::new(
segment.columns(),
path,
SegmentHeader::new(expected_block_range, None, None, segment),
);
if segment.is_headers() {
jar = jar.with_lz4();
}
jar
}