use alloy_primitives::{BlockNumber, TxNumber};
use reth_config::config::EtlConfig;
use reth_db::BlockNumberList;
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
models::sharded_key::NUM_OF_INDICES_IN_SHARD,
table::{Decompress, Table},
transaction::{DbTx, DbTxMut},
DatabaseError,
};
use reth_etl::Collector;
use reth_primitives::StaticFileSegment;
use reth_provider::{
providers::StaticFileProvider, BlockReader, DBProvider, ProviderError,
StaticFileProviderFactory,
};
use reth_stages_api::StageError;
use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
use tracing::info;
const DEFAULT_CACHE_THRESHOLD: u64 = 100_000;
pub(crate) fn collect_history_indices<Provider, CS, H, P>(
provider: &Provider,
range: impl RangeBounds<CS::Key>,
sharded_key_factory: impl Fn(P, BlockNumber) -> H::Key,
partial_key_factory: impl Fn((CS::Key, CS::Value)) -> (u64, P),
etl_config: &EtlConfig,
) -> Result<Collector<H::Key, H::Value>, StageError>
where
Provider: DBProvider,
CS: Table,
H: Table<Value = BlockNumberList>,
P: Copy + Eq + Hash,
{
let mut changeset_cursor = provider.tx_ref().cursor_read::<CS>()?;
let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
let mut cache: HashMap<P, Vec<u64>> = HashMap::default();
let mut collect = |cache: &HashMap<P, Vec<u64>>| {
for (key, indices) in cache {
let last = indices.last().expect("qed");
collector.insert(
sharded_key_factory(*key, *last),
BlockNumberList::new_pre_sorted(indices.iter().copied()),
)?;
}
Ok::<(), StageError>(())
};
let total_changesets = provider.tx_ref().entries::<CS>()?;
let interval = (total_changesets / 1000).max(1);
let mut flush_counter = 0;
let mut current_block_number = u64::MAX;
for (idx, entry) in changeset_cursor.walk_range(range)?.enumerate() {
let (block_number, key) = partial_key_factory(entry?);
cache.entry(key).or_default().push(block_number);
if idx > 0 && idx % interval == 0 && total_changesets > 1000 {
info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
}
if current_block_number != block_number {
current_block_number = block_number;
flush_counter += 1;
if flush_counter > DEFAULT_CACHE_THRESHOLD {
collect(&cache)?;
cache.clear();
flush_counter = 0;
}
}
}
collect(&cache)?;
Ok(collector)
}
pub(crate) fn load_history_indices<Provider, H, P>(
provider: &Provider,
mut collector: Collector<H::Key, H::Value>,
append_only: bool,
sharded_key_factory: impl Clone + Fn(P, u64) -> <H as Table>::Key,
decode_key: impl Fn(Vec<u8>) -> Result<<H as Table>::Key, DatabaseError>,
get_partial: impl Fn(<H as Table>::Key) -> P,
) -> Result<(), StageError>
where
Provider: DBProvider<Tx: DbTxMut>,
H: Table<Value = BlockNumberList>,
P: Copy + Default + Eq,
{
let mut write_cursor = provider.tx_ref().cursor_write::<H>()?;
let mut current_partial = P::default();
let mut current_list = Vec::<u64>::new();
let total_entries = collector.len();
let interval = (total_entries / 100).max(1);
for (index, element) in collector.iter()?.enumerate() {
let (k, v) = element?;
let sharded_key = decode_key(k)?;
let new_list = BlockNumberList::decompress_owned(v)?;
if index > 0 && index % interval == 0 && total_entries > 100 {
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
}
let partial_key = get_partial(sharded_key);
if current_partial != partial_key {
load_indices(
&mut write_cursor,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::Flush,
)?;
current_partial = partial_key;
current_list.clear();
if !append_only {
if let Some((_, last_database_shard)) =
write_cursor.seek_exact(sharded_key_factory(current_partial, u64::MAX))?
{
current_list.extend(last_database_shard.iter());
}
}
}
current_list.extend(new_list.iter());
load_indices(
&mut write_cursor,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::KeepLast,
)?;
}
load_indices(
&mut write_cursor,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::Flush,
)?;
Ok(())
}
pub(crate) fn load_indices<H, C, P>(
cursor: &mut C,
partial_key: P,
list: &mut Vec<BlockNumber>,
sharded_key_factory: &impl Fn(P, BlockNumber) -> <H as Table>::Key,
append_only: bool,
mode: LoadMode,
) -> Result<(), StageError>
where
C: DbCursorRO<H> + DbCursorRW<H>,
H: Table<Value = BlockNumberList>,
P: Copy,
{
if list.len() > NUM_OF_INDICES_IN_SHARD || mode.is_flush() {
let chunks = list
.chunks(NUM_OF_INDICES_IN_SHARD)
.map(|chunks| chunks.to_vec())
.collect::<Vec<Vec<u64>>>();
let mut iter = chunks.into_iter().peekable();
while let Some(chunk) = iter.next() {
let mut highest = *chunk.last().expect("at least one index");
if !mode.is_flush() && iter.peek().is_none() {
*list = chunk;
} else {
if iter.peek().is_none() {
highest = u64::MAX;
}
let key = sharded_key_factory(partial_key, highest);
let value = BlockNumberList::new_pre_sorted(chunk);
if append_only {
cursor.append(key, value)?;
} else {
cursor.upsert(key, value)?;
}
}
}
}
Ok(())
}
pub(crate) enum LoadMode {
KeepLast,
Flush,
}
impl LoadMode {
const fn is_flush(&self) -> bool {
matches!(self, Self::Flush)
}
}
pub(crate) fn missing_static_data_error<Provider>(
last_tx_num: TxNumber,
static_file_provider: &StaticFileProvider<Provider::Primitives>,
provider: &Provider,
segment: StaticFileSegment,
) -> Result<StageError, ProviderError>
where
Provider: BlockReader + StaticFileProviderFactory,
{
let mut last_block =
static_file_provider.get_highest_static_file_block(segment).unwrap_or_default();
loop {
if let Some(indices) = provider.block_body_indices(last_block)? {
if indices.last_tx_num() <= last_tx_num {
break
}
}
if last_block == 0 {
break
}
last_block -= 1;
}
let missing_block = Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default());
Ok(StageError::MissingStaticFileData {
block: Box::new(missing_block.block_with_parent()),
segment,
})
}