use crate::{segments, segments::Segment, StaticFileProducerEvent};
use alloy_primitives::BlockNumber;
use parking_lot::Mutex;
use rayon::prelude::*;
use reth_codecs::Compact;
use reth_db::table::Value;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::StaticFileWriter, BlockReader, ChainStateBlockReader, DBProvider,
DatabaseProviderFactory, StageCheckpointReader, StaticFileProviderFactory,
};
use reth_prune_types::PruneModes;
use reth_stages_types::StageId;
use reth_static_file_types::{HighestStaticFiles, StaticFileTargets};
use reth_storage_errors::provider::ProviderResult;
use reth_tokio_util::{EventSender, EventStream};
use std::{
ops::{Deref, RangeInclusive},
sync::Arc,
time::Instant,
};
use tracing::{debug, trace};
pub type StaticFileProducerResult = ProviderResult<StaticFileTargets>;
pub type StaticFileProducerWithResult<Provider> =
(StaticFileProducer<Provider>, StaticFileProducerResult);
#[derive(Debug)]
pub struct StaticFileProducer<Provider>(Arc<Mutex<StaticFileProducerInner<Provider>>>);
impl<Provider> StaticFileProducer<Provider> {
pub fn new(provider: Provider, prune_modes: PruneModes) -> Self {
Self(Arc::new(Mutex::new(StaticFileProducerInner::new(provider, prune_modes))))
}
}
impl<Provider> Clone for StaticFileProducer<Provider> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<Provider> Deref for StaticFileProducer<Provider> {
type Target = Arc<Mutex<StaticFileProducerInner<Provider>>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Debug)]
pub struct StaticFileProducerInner<Provider> {
provider: Provider,
prune_modes: PruneModes,
event_sender: EventSender<StaticFileProducerEvent>,
}
impl<Provider> StaticFileProducerInner<Provider> {
fn new(provider: Provider, prune_modes: PruneModes) -> Self {
Self { provider, prune_modes, event_sender: Default::default() }
}
}
impl<Provider> StaticFileProducerInner<Provider>
where
Provider: StaticFileProviderFactory + DatabaseProviderFactory<Provider: ChainStateBlockReader>,
{
pub fn last_finalized_block(&self) -> ProviderResult<Option<BlockNumber>> {
self.provider.database_provider_ro()?.last_finalized_block_number()
}
}
impl<Provider> StaticFileProducerInner<Provider>
where
Provider: StaticFileProviderFactory
+ DatabaseProviderFactory<
Provider: StaticFileProviderFactory<
Primitives: NodePrimitives<
SignedTx: Value + Compact,
BlockHeader: Value + Compact,
Receipt: Value + Compact,
>,
> + StageCheckpointReader
+ BlockReader,
>,
{
pub fn events(&self) -> EventStream<StaticFileProducerEvent> {
self.event_sender.new_listener()
}
pub fn run(&self, targets: StaticFileTargets) -> StaticFileProducerResult {
if !targets.any() {
return Ok(targets)
}
debug_assert!(targets.is_contiguous_to_highest_static_files(
self.provider.static_file_provider().get_highest_static_files()
));
self.event_sender.notify(StaticFileProducerEvent::Started { targets: targets.clone() });
debug!(target: "static_file", ?targets, "StaticFileProducer started");
let start = Instant::now();
let mut segments =
Vec::<(Box<dyn Segment<Provider::Provider>>, RangeInclusive<BlockNumber>)>::new();
if let Some(block_range) = targets.transactions.clone() {
segments.push((Box::new(segments::Transactions), block_range));
}
if let Some(block_range) = targets.headers.clone() {
segments.push((Box::new(segments::Headers), block_range));
}
if let Some(block_range) = targets.receipts.clone() {
segments.push((Box::new(segments::Receipts), block_range));
}
segments.par_iter().try_for_each(|(segment, block_range)| -> ProviderResult<()> {
debug!(target: "static_file", segment = %segment.segment(), ?block_range, "StaticFileProducer segment");
let start = Instant::now();
let provider = self.provider.database_provider_ro()?.disable_long_read_transaction_safety();
segment.copy_to_static_files(provider, block_range.clone())?;
let elapsed = start.elapsed(); debug!(target: "static_file", segment = %segment.segment(), ?block_range, ?elapsed, "Finished StaticFileProducer segment");
Ok(())
})?;
self.provider.static_file_provider().commit()?;
for (segment, block_range) in segments {
self.provider
.static_file_provider()
.update_index(segment.segment(), Some(*block_range.end()))?;
}
let elapsed = start.elapsed(); debug!(target: "static_file", ?targets, ?elapsed, "StaticFileProducer finished");
self.event_sender
.notify(StaticFileProducerEvent::Finished { targets: targets.clone(), elapsed });
Ok(targets)
}
pub fn copy_to_static_files(&self) -> ProviderResult<HighestStaticFiles> {
let provider = self.provider.database_provider_ro()?;
let stages_checkpoints = [StageId::Headers, StageId::Execution, StageId::Bodies]
.into_iter()
.map(|stage| provider.get_stage_checkpoint(stage).map(|c| c.map(|c| c.block_number)))
.collect::<Result<Vec<_>, _>>()?;
let highest_static_files = HighestStaticFiles {
headers: stages_checkpoints[0],
receipts: stages_checkpoints[1],
transactions: stages_checkpoints[2],
};
let targets = self.get_static_file_targets(highest_static_files)?;
self.run(targets)?;
Ok(highest_static_files)
}
pub fn get_static_file_targets(
&self,
finalized_block_numbers: HighestStaticFiles,
) -> ProviderResult<StaticFileTargets> {
let highest_static_files = self.provider.static_file_provider().get_highest_static_files();
let targets = StaticFileTargets {
headers: finalized_block_numbers.headers.and_then(|finalized_block_number| {
self.get_static_file_target(highest_static_files.headers, finalized_block_number)
}),
receipts: if self.prune_modes.receipts.is_none() &&
self.prune_modes.receipts_log_filter.is_empty()
{
finalized_block_numbers.receipts.and_then(|finalized_block_number| {
self.get_static_file_target(
highest_static_files.receipts,
finalized_block_number,
)
})
} else {
None
},
transactions: finalized_block_numbers.transactions.and_then(|finalized_block_number| {
self.get_static_file_target(
highest_static_files.transactions,
finalized_block_number,
)
}),
};
trace!(
target: "static_file",
?finalized_block_numbers,
?highest_static_files,
?targets,
any = %targets.any(),
"StaticFile targets"
);
Ok(targets)
}
fn get_static_file_target(
&self,
highest_static_file: Option<BlockNumber>,
finalized_block_number: BlockNumber,
) -> Option<RangeInclusive<BlockNumber>> {
let range = highest_static_file.map_or(0, |block| block + 1)..=finalized_block_number;
(!range.is_empty()).then_some(range)
}
}
#[cfg(test)]
mod tests {
use crate::static_file_producer::{
StaticFileProducer, StaticFileProducerInner, StaticFileTargets,
};
use alloy_primitives::{B256, U256};
use assert_matches::assert_matches;
use reth_db_api::{database::Database, transaction::DbTx};
use reth_provider::{
providers::StaticFileWriter, test_utils::MockNodeTypesWithDB, ProviderError,
ProviderFactory, StaticFileProviderFactory,
};
use reth_prune_types::PruneModes;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_static_file_types::{HighestStaticFiles, StaticFileSegment};
use reth_testing_utils::generators::{
self, random_block_range, random_receipt, BlockRangeParams,
};
use std::{sync::mpsc::channel, time::Duration};
use tempfile::TempDir;
fn setup() -> (ProviderFactory<MockNodeTypesWithDB>, TempDir) {
let mut rng = generators::rng();
let db = TestStageDB::default();
let blocks = random_block_range(
&mut rng,
0..=3,
BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
);
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
let static_file_provider = db.factory.static_file_provider();
let mut static_file_writer = static_file_provider
.latest_writer(StaticFileSegment::Headers)
.expect("get static file writer for headers");
static_file_writer.prune_headers(blocks.len() as u64).unwrap();
static_file_writer.commit().expect("prune headers");
let tx = db.factory.db_ref().tx_mut().expect("init tx");
for block in &blocks {
TestStageDB::insert_header(None, &tx, &block.header, U256::ZERO)
.expect("insert block header");
}
tx.commit().expect("commit tx");
let mut receipts = Vec::new();
for block in &blocks {
for transaction in &block.body.transactions {
receipts
.push((receipts.len() as u64, random_receipt(&mut rng, transaction, Some(0))));
}
}
db.insert_receipts(receipts).expect("insert receipts");
let provider_factory = db.factory;
(provider_factory, db.temp_static_files_dir)
}
#[test]
fn run() {
let (provider_factory, _temp_static_files_dir) = setup();
let static_file_producer =
StaticFileProducerInner::new(provider_factory.clone(), PruneModes::default());
let targets = static_file_producer
.get_static_file_targets(HighestStaticFiles {
headers: Some(1),
receipts: Some(1),
transactions: Some(1),
})
.expect("get static file targets");
assert_eq!(
targets,
StaticFileTargets {
headers: Some(0..=1),
receipts: Some(0..=1),
transactions: Some(0..=1)
}
);
assert_matches!(static_file_producer.run(targets), Ok(_));
assert_eq!(
provider_factory.static_file_provider().get_highest_static_files(),
HighestStaticFiles { headers: Some(1), receipts: Some(1), transactions: Some(1) }
);
let targets = static_file_producer
.get_static_file_targets(HighestStaticFiles {
headers: Some(3),
receipts: Some(3),
transactions: Some(3),
})
.expect("get static file targets");
assert_eq!(
targets,
StaticFileTargets {
headers: Some(2..=3),
receipts: Some(2..=3),
transactions: Some(2..=3)
}
);
assert_matches!(static_file_producer.run(targets), Ok(_));
assert_eq!(
provider_factory.static_file_provider().get_highest_static_files(),
HighestStaticFiles { headers: Some(3), receipts: Some(3), transactions: Some(3) }
);
let targets = static_file_producer
.get_static_file_targets(HighestStaticFiles {
headers: Some(4),
receipts: Some(4),
transactions: Some(4),
})
.expect("get static file targets");
assert_eq!(
targets,
StaticFileTargets {
headers: Some(4..=4),
receipts: Some(4..=4),
transactions: Some(4..=4)
}
);
assert_matches!(
static_file_producer.run(targets),
Err(ProviderError::BlockBodyIndicesNotFound(4))
);
assert_eq!(
provider_factory.static_file_provider().get_highest_static_files(),
HighestStaticFiles { headers: Some(3), receipts: Some(3), transactions: Some(3) }
);
}
#[test]
fn only_one() {
let (provider_factory, _temp_static_files_dir) = setup();
let static_file_producer = StaticFileProducer::new(provider_factory, PruneModes::default());
let (tx, rx) = channel();
for i in 0..5 {
let producer = static_file_producer.clone();
let tx = tx.clone();
std::thread::spawn(move || {
let locked_producer = producer.lock();
if i == 0 {
std::thread::sleep(Duration::from_millis(100));
}
let targets = locked_producer
.get_static_file_targets(HighestStaticFiles {
headers: Some(1),
receipts: Some(1),
transactions: Some(1),
})
.expect("get static file targets");
assert_matches!(locked_producer.run(targets.clone()), Ok(_));
tx.send(targets).unwrap();
});
}
drop(tx);
let mut only_one = Some(());
for target in rx {
assert!(only_one.take().is_some_and(|_| target.any()) || !target.any())
}
}
}