use crate::{
bundle_state::StorageRevertsIter,
providers::{database::metrics, static_file::StaticFileWriter, StaticFileProvider},
to_range,
traits::{
AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
},
writer::UnifiedStorageWriter,
AccountReader, BlockExecutionWriter, BlockHashReader, BlockNumReader, BlockReader, BlockWriter,
BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter, DBProvider, EvmEnvProvider,
HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, HistoricalStateProvider,
HistoricalStateProviderRef, HistoryWriter, LatestStateProvider, LatestStateProviderRef,
OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RevertsInit,
StageCheckpointReader, StateChangeWriter, StateProviderBox, StateReader, StateWriter,
StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
TransactionsProvider, TransactionsProviderExt, TrieWriter, WithdrawalsProvider,
};
use alloy_consensus::Header;
use alloy_eips::{
eip4895::{Withdrawal, Withdrawals},
BlockHashOrNumber,
};
use alloy_primitives::{keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256};
use itertools::{izip, Itertools};
use rayon::slice::ParallelSliceMut;
use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, EthereumHardforks};
use reth_db::{
cursor::DbDupCursorRW, tables, BlockNumberList, PlainAccountState, PlainStorageState,
};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
database::Database,
models::{
sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
ShardedKey, StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals,
},
table::Table,
transaction::{DbTx, DbTxMut},
DatabaseError, DbTxUnwindExt,
};
use reth_evm::ConfigureEvmEnv;
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_network_p2p::headers::downloader::SyncTarget;
use reth_node_types::NodeTypes;
use reth_primitives::{
Account, Block, BlockBody, BlockWithSenders, Bytecode, GotExpected, Receipt, SealedBlock,
SealedBlockWithSenders, SealedHeader, StaticFileSegment, StorageEntry, TransactionMeta,
TransactionSigned, TransactionSignedEcRecovered, TransactionSignedNoHash,
};
use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_api::{StateProvider, StorageChangeSetReader, TryIntoHistoricalStateProvider};
use reth_storage_errors::provider::{ProviderResult, RootMismatch};
use reth_trie::{
prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
updates::{StorageTrieUpdates, TrieUpdates},
HashedPostStateSorted, Nibbles, StateRoot, StoredNibbles,
};
use reth_trie_db::{DatabaseStateRoot, DatabaseStorageTrieCursor};
use revm::{
db::states::{PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset},
primitives::{BlockEnv, CfgEnvWithHandlerCfg},
};
use std::{
cmp::Ordering,
collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
fmt::Debug,
ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
sync::{mpsc, Arc},
time::{Duration, Instant},
};
use tokio::sync::watch;
use tracing::{debug, error, trace, warn};
pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
#[derive(Debug)]
pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
pub DatabaseProvider<<DB as Database>::TXMut, N>,
);
impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
for DatabaseProviderRW<DB, N>
{
fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
&self.0
}
}
impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
pub fn commit(self) -> ProviderResult<bool> {
self.0.commit()
}
pub fn into_tx(self) -> <DB as Database>::TXMut {
self.0.into_tx()
}
}
impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
for DatabaseProvider<<DB as Database>::TXMut, N>
{
fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
provider.0
}
}
#[derive(Debug)]
pub struct DatabaseProvider<TX, N: NodeTypes> {
tx: TX,
chain_spec: Arc<N::ChainSpec>,
static_file_provider: StaticFileProvider,
prune_modes: PruneModes,
}
impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
pub const fn prune_modes_ref(&self) -> &PruneModes {
&self.prune_modes
}
}
impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
pub fn latest<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
trace!(target: "providers::db", "Returning latest state provider");
Ok(Box::new(LatestStateProviderRef::new(&self.tx, self.static_file_provider.clone())))
}
pub fn history_by_block_hash<'a>(
&'a self,
block_hash: BlockHash,
) -> ProviderResult<Box<dyn StateProvider + 'a>> {
let mut block_number =
self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
if block_number == self.best_block_number().unwrap_or_default() &&
block_number == self.last_block_number().unwrap_or_default()
{
return Ok(Box::new(LatestStateProviderRef::new(
&self.tx,
self.static_file_provider.clone(),
)))
}
block_number += 1;
let account_history_prune_checkpoint =
self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
let storage_history_prune_checkpoint =
self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
let mut state_provider = HistoricalStateProviderRef::new(
&self.tx,
block_number,
self.static_file_provider.clone(),
);
if let Some(prune_checkpoint_block_number) =
account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
{
state_provider = state_provider.with_lowest_available_account_history_block_number(
prune_checkpoint_block_number + 1,
);
}
if let Some(prune_checkpoint_block_number) =
storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
{
state_provider = state_provider.with_lowest_available_storage_history_block_number(
prune_checkpoint_block_number + 1,
);
}
Ok(Box::new(state_provider))
}
}
impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
fn static_file_provider(&self) -> StaticFileProvider {
self.static_file_provider.clone()
}
}
impl<TX: Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
for DatabaseProvider<TX, N>
{
type ChainSpec = N::ChainSpec;
fn chain_spec(&self) -> Arc<Self::ChainSpec> {
self.chain_spec.clone()
}
}
impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
pub const fn new_rw(
tx: TX,
chain_spec: Arc<N::ChainSpec>,
static_file_provider: StaticFileProvider,
prune_modes: PruneModes,
) -> Self {
Self { tx, chain_spec, static_file_provider, prune_modes }
}
}
impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
fn as_ref(&self) -> &Self {
self
}
}
impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
fn try_into_history_at_block(
self,
mut block_number: BlockNumber,
) -> ProviderResult<StateProviderBox> {
if block_number == self.best_block_number().unwrap_or_default() &&
block_number == self.last_block_number().unwrap_or_default()
{
return Ok(Box::new(LatestStateProvider::new(self.tx, self.static_file_provider)))
}
block_number += 1;
let account_history_prune_checkpoint =
self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
let storage_history_prune_checkpoint =
self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
let mut state_provider =
HistoricalStateProvider::new(self.tx, block_number, self.static_file_provider);
if let Some(prune_checkpoint_block_number) =
account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
{
state_provider = state_provider.with_lowest_available_account_history_block_number(
prune_checkpoint_block_number + 1,
);
}
if let Some(prune_checkpoint_block_number) =
storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
{
state_provider = state_provider.with_lowest_available_storage_history_block_number(
prune_checkpoint_block_number + 1,
);
}
Ok(Box::new(state_provider))
}
}
impl<Tx: DbTx + DbTxMut + 'static, N: NodeTypes<ChainSpec: EthereumHardforks> + 'static>
DatabaseProvider<Tx, N>
{
pub fn insert_historical_block(
&self,
block: SealedBlockWithSenders,
) -> ProviderResult<StoredBlockBodyIndices> {
let ttd = if block.number == 0 {
block.difficulty
} else {
let parent_block_number = block.number - 1;
let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
parent_ttd + block.difficulty
};
let mut writer = self.static_file_provider.latest_writer(StaticFileSegment::Headers)?;
let segment_header = writer.user_header();
if segment_header.block_end().is_none() && segment_header.expected_block_start() == 0 {
for block_number in 0..block.number {
let mut prev = block.header.clone().unseal();
prev.number = block_number;
writer.append_header(&prev, U256::ZERO, &B256::ZERO)?;
}
}
writer.append_header(block.header.as_ref(), ttd, &block.hash())?;
self.insert_block(block)
}
}
fn unwind_history_shards<S, T, C>(
cursor: &mut C,
start_key: T::Key,
block_number: BlockNumber,
mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
) -> ProviderResult<Vec<u64>>
where
T: Table<Value = BlockNumberList>,
T::Key: AsRef<ShardedKey<S>>,
C: DbCursorRO<T> + DbCursorRW<T>,
{
let mut item = cursor.seek_exact(start_key)?;
while let Some((sharded_key, list)) = item {
if !shard_belongs_to_key(&sharded_key) {
break
}
cursor.delete_current()?;
let first = list.iter().next().expect("List can't be empty");
if first >= block_number {
item = cursor.prev()?;
continue
} else if block_number <= sharded_key.as_ref().highest_block_number {
return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
}
return Ok(list.iter().collect::<Vec<_>>())
}
Ok(Vec::new())
}
impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
pub const fn new(
tx: TX,
chain_spec: Arc<N::ChainSpec>,
static_file_provider: StaticFileProvider,
prune_modes: PruneModes,
) -> Self {
Self { tx, chain_spec, static_file_provider, prune_modes }
}
pub fn into_tx(self) -> TX {
self.tx
}
pub fn tx_mut(&mut self) -> &mut TX {
&mut self.tx
}
pub const fn tx_ref(&self) -> &TX {
&self.tx
}
pub fn chain_spec(&self) -> &N::ChainSpec {
&self.chain_spec
}
fn transactions_by_tx_range_with_cursor<C>(
&self,
range: impl RangeBounds<TxNumber>,
cursor: &mut C,
) -> ProviderResult<Vec<TransactionSignedNoHash>>
where
C: DbCursorRO<tables::Transactions>,
{
self.static_file_provider.get_range_with_static_file_or_database(
StaticFileSegment::Transactions,
to_range(range),
|static_file, range, _| static_file.transactions_by_tx_range(range),
|range, _| self.cursor_collect(cursor, range),
|_| true,
)
}
fn block_with_senders<H, HF, B, BF>(
&self,
id: BlockHashOrNumber,
transaction_kind: TransactionVariant,
header_by_number: HF,
construct_block: BF,
) -> ProviderResult<Option<B>>
where
N::ChainSpec: EthereumHardforks,
H: AsRef<Header>,
HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
BF: FnOnce(
H,
Vec<TransactionSigned>,
Vec<Address>,
Vec<Header>,
Option<Withdrawals>,
) -> ProviderResult<Option<B>>,
{
let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
let Some(header) = header_by_number(block_number)? else { return Ok(None) };
let ommers = self.ommers(block_number.into())?.unwrap_or_default();
let withdrawals =
self.withdrawals_by_block(block_number.into(), header.as_ref().timestamp)?;
let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
let tx_range = body.tx_num_range();
let (transactions, senders) = if tx_range.is_empty() {
(vec![], vec![])
} else {
(self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
};
let body = transactions
.into_iter()
.map(|tx| match transaction_kind {
TransactionVariant::NoHash => TransactionSigned {
hash: B256::ZERO,
signature: tx.signature,
transaction: tx.transaction,
},
TransactionVariant::WithHash => tx.with_hash(),
})
.collect();
construct_block(header, body, senders, ommers, withdrawals)
}
fn block_range<F, H, HF, R>(
&self,
range: RangeInclusive<BlockNumber>,
headers_range: HF,
mut assemble_block: F,
) -> ProviderResult<Vec<R>>
where
N::ChainSpec: EthereumHardforks,
H: AsRef<Header>,
HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
F: FnMut(H, Range<TxNumber>, Vec<Header>, Option<Withdrawals>) -> ProviderResult<R>,
{
if range.is_empty() {
return Ok(Vec::new())
}
let len = range.end().saturating_sub(*range.start()) as usize;
let mut blocks = Vec::with_capacity(len);
let headers = headers_range(range)?;
let mut ommers_cursor = self.tx.cursor_read::<tables::BlockOmmers>()?;
let mut withdrawals_cursor = self.tx.cursor_read::<tables::BlockWithdrawals>()?;
let mut block_body_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
for header in headers {
let header_ref = header.as_ref();
if let Some((_, block_body_indices)) =
block_body_cursor.seek_exact(header_ref.number)?
{
let tx_range = block_body_indices.tx_num_range();
let withdrawals =
if self.chain_spec.is_shanghai_active_at_timestamp(header_ref.timestamp) {
withdrawals_cursor
.seek_exact(header_ref.number)?
.map(|(_, w)| w.withdrawals)
.unwrap_or_default()
.into()
} else {
None
};
let ommers =
if self.chain_spec.final_paris_total_difficulty(header_ref.number).is_some() {
Vec::new()
} else {
ommers_cursor
.seek_exact(header_ref.number)?
.map(|(_, o)| o.ommers)
.unwrap_or_default()
};
if let Ok(b) = assemble_block(header, tx_range, ommers, withdrawals) {
blocks.push(b);
}
}
}
Ok(blocks)
}
fn block_with_senders_range<H, HF, B, BF>(
&self,
range: RangeInclusive<BlockNumber>,
headers_range: HF,
assemble_block: BF,
) -> ProviderResult<Vec<B>>
where
N::ChainSpec: EthereumHardforks,
H: AsRef<Header>,
HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
BF: Fn(
H,
Vec<TransactionSigned>,
Vec<Header>,
Option<Withdrawals>,
Vec<Address>,
) -> ProviderResult<B>,
{
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions>()?;
let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;
self.block_range(range, headers_range, |header, tx_range, ommers, withdrawals| {
let (body, senders) = if tx_range.is_empty() {
(Vec::new(), Vec::new())
} else {
let body = self
.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)?
.into_iter()
.map(Into::into)
.collect::<Vec<TransactionSigned>>();
let known_senders =
senders_cursor
.walk_range(tx_range.clone())?
.collect::<Result<HashMap<_, _>, _>>()?;
let mut senders = Vec::with_capacity(body.len());
for (tx_num, tx) in tx_range.zip(body.iter()) {
match known_senders.get(&tx_num) {
None => {
let sender = tx
.recover_signer_unchecked()
.ok_or(ProviderError::SenderRecoveryError)?;
senders.push(sender);
}
Some(sender) => senders.push(*sender),
}
}
(body, senders)
};
assemble_block(header, body, ommers, withdrawals, senders)
})
}
pub fn get_state(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Option<ExecutionOutcome>> {
if range.is_empty() {
return Ok(None)
}
let start_block_number = *range.start();
let block_bodies = self.get::<tables::BlockBodyIndices>(range.clone())?;
let Some(from_transaction_num) = block_bodies.first().map(|bodies| bodies.1.first_tx_num())
else {
return Ok(None)
};
let Some(to_transaction_num) = block_bodies.last().map(|bodies| bodies.1.last_tx_num())
else {
return Ok(None)
};
let storage_range = BlockNumberAddress::range(range.clone());
let storage_changeset = self.get::<tables::StorageChangeSets>(storage_range)?;
let account_changeset = self.get::<tables::AccountChangeSets>(range)?;
let mut plain_accounts_cursor = self.tx.cursor_read::<tables::PlainAccountState>()?;
let mut plain_storage_cursor = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
let (state, reverts) = self.populate_bundle_state(
account_changeset,
storage_changeset,
&mut plain_accounts_cursor,
&mut plain_storage_cursor,
)?;
let mut receipt_iter =
self.get::<tables::Receipts>(from_transaction_num..=to_transaction_num)?.into_iter();
let mut receipts = Vec::with_capacity(block_bodies.len());
for (_, block_body) in block_bodies {
let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
for _ in block_body.tx_num_range() {
if let Some((_, receipt)) = receipt_iter.next() {
block_receipts.push(Some(receipt));
}
}
receipts.push(block_receipts);
}
Ok(Some(ExecutionOutcome::new_init(
state,
reverts,
Vec::new(),
receipts.into(),
start_block_number,
Vec::new(),
)))
}
fn populate_bundle_state<A, S>(
&self,
account_changeset: Vec<(u64, AccountBeforeTx)>,
storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
plain_accounts_cursor: &mut A,
plain_storage_cursor: &mut S,
) -> ProviderResult<(BundleStateInit, RevertsInit)>
where
A: DbCursorRO<PlainAccountState>,
S: DbDupCursorRO<PlainStorageState>,
{
let mut state: BundleStateInit = HashMap::default();
let mut reverts: RevertsInit = HashMap::default();
for (block_number, account_before) in account_changeset.into_iter().rev() {
let AccountBeforeTx { info: old_info, address } = account_before;
match state.entry(address) {
hash_map::Entry::Vacant(entry) => {
let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
entry.insert((old_info, new_info, HashMap::default()));
}
hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().0 = old_info;
}
}
reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
}
for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
let BlockNumberAddress((block_number, address)) = block_and_address;
let account_state = match state.entry(address) {
hash_map::Entry::Vacant(entry) => {
let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
entry.insert((present_info, present_info, HashMap::default()))
}
hash_map::Entry::Occupied(entry) => entry.into_mut(),
};
match account_state.2.entry(old_storage.key) {
hash_map::Entry::Vacant(entry) => {
let new_storage = plain_storage_cursor
.seek_by_key_subkey(address, old_storage.key)?
.filter(|storage| storage.key == old_storage.key)
.unwrap_or_default();
entry.insert((old_storage.value, new_storage.value));
}
hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().0 = old_storage.value;
}
};
reverts
.entry(block_number)
.or_default()
.entry(address)
.or_default()
.1
.push(old_storage);
}
Ok((state, reverts))
}
}
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
pub fn commit(self) -> ProviderResult<bool> {
Ok(self.tx.commit()?)
}
pub fn remove_block_transaction_range(
&self,
range: impl RangeBounds<BlockNumber> + Clone,
) -> ProviderResult<()> {
let block_bodies = self.take::<tables::BlockBodyIndices>(range)?;
if block_bodies.is_empty() {
return Ok(())
}
let first_transaction = block_bodies.first().expect("If we have headers").1.first_tx_num();
let last_transaction = block_bodies.last().expect("Not empty").1.last_tx_num();
if last_transaction < first_transaction {
return Ok(())
}
let transactions = self
.take::<tables::Transactions>(first_transaction..=last_transaction)?
.into_iter()
.map(|(id, tx)| (id, tx.into()))
.collect::<Vec<(u64, TransactionSigned)>>();
self.remove::<tables::TransactionSenders>(first_transaction..=last_transaction)?;
let mut tx_hash_cursor = self.tx.cursor_write::<tables::TransactionHashNumbers>()?;
for (_, tx) in &transactions {
if tx_hash_cursor.seek_exact(tx.hash())?.is_some() {
tx_hash_cursor.delete_current()?;
}
}
if !transactions.is_empty() {
let tx_id_range = transactions.first().unwrap().0..=transactions.last().unwrap().0;
self.remove::<tables::TransactionBlocks>(tx_id_range)?;
}
Ok(())
}
pub fn take_block_transaction_range(
&self,
range: impl RangeBounds<BlockNumber> + Clone,
) -> ProviderResult<Vec<(BlockNumber, Vec<TransactionSignedEcRecovered>)>> {
let block_bodies = self.get::<tables::BlockBodyIndices>(range)?;
if block_bodies.is_empty() {
return Ok(Vec::new())
}
let first_transaction = block_bodies.first().expect("If we have headers").1.first_tx_num();
let last_transaction = block_bodies.last().expect("Not empty").1.last_tx_num();
if last_transaction < first_transaction {
return Ok(block_bodies.into_iter().map(|(n, _)| (n, Vec::new())).collect())
}
let transactions = self
.take::<tables::Transactions>(first_transaction..=last_transaction)?
.into_iter()
.map(|(id, tx)| (id, tx.into()))
.collect::<Vec<(u64, TransactionSigned)>>();
let mut senders =
self.take::<tables::TransactionSenders>(first_transaction..=last_transaction)?;
recover_block_senders(&mut senders, &transactions, first_transaction, last_transaction)?;
let mut tx_hash_cursor = self.tx.cursor_write::<tables::TransactionHashNumbers>()?;
for (_, tx) in &transactions {
if tx_hash_cursor.seek_exact(tx.hash())?.is_some() {
tx_hash_cursor.delete_current()?;
}
}
if !transactions.is_empty() {
let tx_id_range = transactions.first().unwrap().0..=transactions.last().unwrap().0;
self.remove::<tables::TransactionBlocks>(tx_id_range)?;
}
let mut block_tx = Vec::with_capacity(block_bodies.len());
let mut senders = senders.into_iter();
let mut transactions = transactions.into_iter();
for (block_number, block_body) in block_bodies {
let mut one_block_tx = Vec::with_capacity(block_body.tx_count as usize);
for _ in block_body.tx_num_range() {
let tx = transactions.next();
let sender = senders.next();
let recovered = match (tx, sender) {
(Some((tx_id, tx)), Some((sender_tx_id, sender))) => {
if tx_id == sender_tx_id {
Ok(TransactionSignedEcRecovered::from_signed_transaction(tx, sender))
} else {
Err(ProviderError::MismatchOfTransactionAndSenderId { tx_id })
}
}
(Some((tx_id, _)), _) | (_, Some((tx_id, _))) => {
Err(ProviderError::MismatchOfTransactionAndSenderId { tx_id })
}
(None, None) => Err(ProviderError::BlockBodyTransactionCount),
}?;
one_block_tx.push(recovered)
}
block_tx.push((block_number, one_block_tx));
}
Ok(block_tx)
}
pub fn remove_block_range(
&self,
range: impl RangeBounds<BlockNumber> + Clone,
) -> ProviderResult<()> {
let block_headers = self.remove::<tables::Headers>(range.clone())?;
if block_headers == 0 {
return Ok(())
}
self.tx.unwind_table_by_walker::<tables::CanonicalHeaders, tables::HeaderNumbers>(
range.clone(),
)?;
self.remove::<tables::CanonicalHeaders>(range.clone())?;
self.remove::<tables::BlockOmmers>(range.clone())?;
self.remove::<tables::BlockWithdrawals>(range.clone())?;
self.remove_block_transaction_range(range.clone())?;
self.remove::<tables::HeaderTerminalDifficulties>(range)?;
Ok(())
}
pub fn take_block_range(
&self,
range: impl RangeBounds<BlockNumber> + Clone,
) -> ProviderResult<Vec<SealedBlockWithSenders>>
where
N::ChainSpec: EthereumHardforks,
{
let block_headers = self.take::<tables::Headers>(range.clone())?;
if block_headers.is_empty() {
return Ok(Vec::new())
}
self.tx.unwind_table_by_walker::<tables::CanonicalHeaders, tables::HeaderNumbers>(
range.clone(),
)?;
let block_header_hashes = self.take::<tables::CanonicalHeaders>(range.clone())?;
let block_ommers = self.take::<tables::BlockOmmers>(range.clone())?;
let block_withdrawals = self.take::<tables::BlockWithdrawals>(range.clone())?;
let block_tx = self.take_block_transaction_range(range.clone())?;
let mut blocks = Vec::with_capacity(block_headers.len());
self.remove::<tables::HeaderTerminalDifficulties>(range)?;
let block_header_iter = block_headers.into_iter();
let block_header_hashes_iter = block_header_hashes.into_iter();
let block_tx_iter = block_tx.into_iter();
let mut block_ommers_iter = block_ommers.into_iter();
let mut block_withdrawals_iter = block_withdrawals.into_iter();
let mut block_ommers = block_ommers_iter.next();
let mut block_withdrawals = block_withdrawals_iter.next();
for ((main_block_number, header), (_, header_hash), (_, tx)) in
izip!(block_header_iter, block_header_hashes_iter, block_tx_iter)
{
let header = SealedHeader::new(header, header_hash);
let (transactions, senders) = tx.into_iter().map(|tx| tx.to_components()).unzip();
let mut ommers = Vec::new();
if let Some((block_number, _)) = block_ommers.as_ref() {
if *block_number == main_block_number {
ommers = block_ommers.take().unwrap().1.ommers;
block_ommers = block_ommers_iter.next();
}
};
let shanghai_is_active =
self.chain_spec.is_shanghai_active_at_timestamp(header.timestamp);
let mut withdrawals = Some(Withdrawals::default());
if shanghai_is_active {
if let Some((block_number, _)) = block_withdrawals.as_ref() {
if *block_number == main_block_number {
withdrawals = Some(block_withdrawals.take().unwrap().1.withdrawals);
block_withdrawals = block_withdrawals_iter.next();
}
}
} else {
withdrawals = None
}
blocks.push(SealedBlockWithSenders {
block: SealedBlock {
header,
body: BlockBody { transactions, ommers, withdrawals },
},
senders,
})
}
Ok(blocks)
}
fn take_shard<T>(&self, key: T::Key) -> ProviderResult<Vec<u64>>
where
T: Table<Value = BlockNumberList>,
{
let mut cursor = self.tx.cursor_read::<T>()?;
let shard = cursor.seek_exact(key)?;
if let Some((shard_key, list)) = shard {
self.tx.delete::<T>(shard_key, None)?;
let list = list.iter().collect::<Vec<_>>();
return Ok(list)
}
Ok(Vec::new())
}
fn append_history_index<P, T>(
&self,
index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
) -> ProviderResult<()>
where
P: Copy,
T: Table<Value = BlockNumberList>,
{
for (partial_key, indices) in index_updates {
let mut last_shard =
self.take_shard::<T>(sharded_key_factory(partial_key, u64::MAX))?;
last_shard.extend(indices);
let indices = last_shard;
let mut chunks = indices.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
while let Some(list) = chunks.next() {
let highest_block_number = if chunks.peek().is_some() {
*list.last().expect("`chunks` does not return empty list")
} else {
u64::MAX
};
self.tx.put::<T>(
sharded_key_factory(partial_key, highest_block_number),
BlockNumberList::new_pre_sorted(list.iter().copied()),
)?;
}
}
Ok(())
}
}
impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
fn basic_account(&self, address: Address) -> ProviderResult<Option<Account>> {
Ok(self.tx.get::<tables::PlainAccountState>(address)?)
}
}
impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
fn changed_accounts_with_range(
&self,
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<BTreeSet<Address>> {
self.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range)?
.map(|entry| {
entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
})
.collect()
}
fn basic_accounts(
&self,
iter: impl IntoIterator<Item = Address>,
) -> ProviderResult<Vec<(Address, Option<Account>)>> {
let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
Ok(iter
.into_iter()
.map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
.collect::<Result<Vec<_>, _>>()?)
}
fn changed_accounts_and_blocks_with_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
BTreeMap::new(),
|mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
let (index, account) = entry?;
accounts.entry(account.address).or_default().push(index);
Ok(accounts)
},
)?;
Ok(account_transitions)
}
}
impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
fn storage_changeset(
&self,
block_number: BlockNumber,
) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
let range = block_number..=block_number;
let storage_range = BlockNumberAddress::range(range);
self.tx
.cursor_dup_read::<tables::StorageChangeSets>()?
.walk_range(storage_range)?
.map(|result| -> ProviderResult<_> { Ok(result?) })
.collect()
}
}
impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
fn account_block_changeset(
&self,
block_number: BlockNumber,
) -> ProviderResult<Vec<AccountBeforeTx>> {
let range = block_number..=block_number;
self.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range)?
.map(|result| -> ProviderResult<_> {
let (_, account_before) = result?;
Ok(account_before)
})
.collect()
}
}
impl<TX: DbTx + 'static, N: NodeTypes> HeaderSyncGapProvider for DatabaseProvider<TX, N> {
fn sync_gap(
&self,
tip: watch::Receiver<B256>,
highest_uninterrupted_block: BlockNumber,
) -> ProviderResult<HeaderSyncGap> {
let static_file_provider = self.static_file_provider();
let next_static_file_block_num = static_file_provider
.get_highest_static_file_block(StaticFileSegment::Headers)
.map(|id| id + 1)
.unwrap_or_default();
let next_block = highest_uninterrupted_block + 1;
match next_static_file_block_num.cmp(&next_block) {
Ordering::Greater => {
let mut static_file_producer =
static_file_provider.latest_writer(StaticFileSegment::Headers)?;
static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
static_file_producer.commit()?
}
Ordering::Less => {
return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
}
Ordering::Equal => {}
}
let local_head = static_file_provider
.sealed_header(highest_uninterrupted_block)?
.ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
let target = SyncTarget::Tip(*tip.borrow());
Ok(HeaderSyncGap { local_head, target })
}
}
impl<TX: DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks>> HeaderProvider
for DatabaseProvider<TX, N>
{
fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Header>> {
if let Some(num) = self.block_number(*block_hash)? {
Ok(self.header_by_number(num)?)
} else {
Ok(None)
}
}
fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Header>> {
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Headers,
num,
|static_file| static_file.header_by_number(num),
|| Ok(self.tx.get::<tables::Headers>(num)?),
)
}
fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
if let Some(num) = self.block_number(*block_hash)? {
self.header_td_by_number(num)
} else {
Ok(None)
}
}
fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
if let Some(td) = self.chain_spec.final_paris_total_difficulty(number) {
return Ok(Some(td))
}
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Headers,
number,
|static_file| static_file.header_td_by_number(number),
|| Ok(self.tx.get::<tables::HeaderTerminalDifficulties>(number)?.map(|td| td.0)),
)
}
fn headers_range(&self, range: impl RangeBounds<BlockNumber>) -> ProviderResult<Vec<Header>> {
self.static_file_provider.get_range_with_static_file_or_database(
StaticFileSegment::Headers,
to_range(range),
|static_file, range, _| static_file.headers_range(range),
|range, _| self.cursor_read_collect::<tables::Headers>(range).map_err(Into::into),
|_| true,
)
}
fn sealed_header(&self, number: BlockNumber) -> ProviderResult<Option<SealedHeader>> {
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Headers,
number,
|static_file| static_file.sealed_header(number),
|| {
if let Some(header) = self.header_by_number(number)? {
let hash = self
.block_hash(number)?
.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
Ok(Some(SealedHeader::new(header, hash)))
} else {
Ok(None)
}
},
)
}
fn sealed_headers_while(
&self,
range: impl RangeBounds<BlockNumber>,
predicate: impl FnMut(&SealedHeader) -> bool,
) -> ProviderResult<Vec<SealedHeader>> {
self.static_file_provider.get_range_with_static_file_or_database(
StaticFileSegment::Headers,
to_range(range),
|static_file, range, predicate| static_file.sealed_headers_while(range, predicate),
|range, mut predicate| {
let mut headers = vec![];
for entry in self.tx.cursor_read::<tables::Headers>()?.walk_range(range)? {
let (number, header) = entry?;
let hash = self
.block_hash(number)?
.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
let sealed = SealedHeader::new(header, hash);
if !predicate(&sealed) {
break
}
headers.push(sealed);
}
Ok(headers)
},
predicate,
)
}
}
impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Headers,
number,
|static_file| static_file.block_hash(number),
|| Ok(self.tx.get::<tables::CanonicalHeaders>(number)?),
)
}
fn canonical_hashes_range(
&self,
start: BlockNumber,
end: BlockNumber,
) -> ProviderResult<Vec<B256>> {
self.static_file_provider.get_range_with_static_file_or_database(
StaticFileSegment::Headers,
start..end,
|static_file, range, _| static_file.canonical_hashes_range(range.start, range.end),
|range, _| {
self.cursor_read_collect::<tables::CanonicalHeaders>(range).map_err(Into::into)
},
|_| true,
)
}
}
impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
fn chain_info(&self) -> ProviderResult<ChainInfo> {
let best_number = self.best_block_number()?;
let best_hash = self.block_hash(best_number)?.unwrap_or_default();
Ok(ChainInfo { best_hash, best_number })
}
fn best_block_number(&self) -> ProviderResult<BlockNumber> {
Ok(self
.get_stage_checkpoint(StageId::Finish)?
.map(|checkpoint| checkpoint.block_number)
.unwrap_or_default())
}
fn last_block_number(&self) -> ProviderResult<BlockNumber> {
Ok(self
.tx
.cursor_read::<tables::CanonicalHeaders>()?
.last()?
.map(|(num, _)| num)
.max(
self.static_file_provider.get_highest_static_file_block(StaticFileSegment::Headers),
)
.unwrap_or_default())
}
fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
}
}
impl<TX: DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks>> BlockReader
for DatabaseProvider<TX, N>
{
fn find_block_by_hash(&self, hash: B256, source: BlockSource) -> ProviderResult<Option<Block>> {
if source.is_canonical() {
self.block(hash.into())
} else {
Ok(None)
}
}
fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Block>> {
if let Some(number) = self.convert_hash_or_number(id)? {
if let Some(header) = self.header_by_number(number)? {
let withdrawals = self.withdrawals_by_block(number.into(), header.timestamp)?;
let ommers = self.ommers(number.into())?.unwrap_or_default();
let transactions = match self.transactions_by_block(number.into())? {
Some(transactions) => transactions,
None => return Ok(None),
};
return Ok(Some(Block {
header,
body: BlockBody { transactions, ommers, withdrawals },
}))
}
}
Ok(None)
}
fn pending_block(&self) -> ProviderResult<Option<SealedBlock>> {
Ok(None)
}
fn pending_block_with_senders(&self) -> ProviderResult<Option<SealedBlockWithSenders>> {
Ok(None)
}
fn pending_block_and_receipts(&self) -> ProviderResult<Option<(SealedBlock, Vec<Receipt>)>> {
Ok(None)
}
fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Header>>> {
if let Some(number) = self.convert_hash_or_number(id)? {
if self.chain_spec.final_paris_total_difficulty(number).is_some() {
return Ok(Some(Vec::new()))
}
let ommers = self.tx.get::<tables::BlockOmmers>(number)?.map(|o| o.ommers);
return Ok(ommers)
}
Ok(None)
}
fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
}
fn block_with_senders(
&self,
id: BlockHashOrNumber,
transaction_kind: TransactionVariant,
) -> ProviderResult<Option<BlockWithSenders>> {
self.block_with_senders(
id,
transaction_kind,
|block_number| self.header_by_number(block_number),
|header, transactions, senders, ommers, withdrawals| {
Block { header, body: BlockBody { transactions, ommers, withdrawals } }
.try_with_senders_unchecked(senders)
.map(Some)
.map_err(|_| ProviderError::SenderRecoveryError)
},
)
}
fn sealed_block_with_senders(
&self,
id: BlockHashOrNumber,
transaction_kind: TransactionVariant,
) -> ProviderResult<Option<SealedBlockWithSenders>> {
self.block_with_senders(
id,
transaction_kind,
|block_number| self.sealed_header(block_number),
|header, transactions, senders, ommers, withdrawals| {
SealedBlock { header, body: BlockBody { transactions, ommers, withdrawals } }
.try_with_senders_unchecked(senders)
.map(Some)
.map_err(|_| ProviderError::SenderRecoveryError)
},
)
}
fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>> {
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions>()?;
self.block_range(
range,
|range| self.headers_range(range),
|header, tx_range, ommers, withdrawals| {
let transactions = if tx_range.is_empty() {
Vec::new()
} else {
self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?
.into_iter()
.map(Into::into)
.collect()
};
Ok(Block { header, body: BlockBody { transactions, ommers, withdrawals } })
},
)
}
fn block_with_senders_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<BlockWithSenders>> {
self.block_with_senders_range(
range,
|range| self.headers_range(range),
|header, transactions, ommers, withdrawals, senders| {
Block { header, body: BlockBody { transactions, ommers, withdrawals } }
.try_with_senders_unchecked(senders)
.map_err(|_| ProviderError::SenderRecoveryError)
},
)
}
fn sealed_block_with_senders_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<SealedBlockWithSenders>> {
self.block_with_senders_range(
range,
|range| self.sealed_headers_range(range),
|header, transactions, ommers, withdrawals, senders| {
SealedBlockWithSenders::new(
SealedBlock { header, body: BlockBody { transactions, ommers, withdrawals } },
senders,
)
.ok_or(ProviderError::SenderRecoveryError)
},
)
}
}
impl<TX: DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks>> TransactionsProviderExt
for DatabaseProvider<TX, N>
{
fn transaction_hashes_by_range(
&self,
tx_range: Range<TxNumber>,
) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
self.static_file_provider.get_range_with_static_file_or_database(
StaticFileSegment::Transactions,
tx_range,
|static_file, range, _| static_file.transaction_hashes_by_range(range),
|tx_range, _| {
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions>()?;
let tx_range_size = tx_range.clone().count();
let tx_walker = tx_cursor.walk_range(tx_range)?;
let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1);
let mut channels = Vec::with_capacity(chunk_size);
let mut transaction_count = 0;
#[inline]
fn calculate_hash(
entry: Result<(TxNumber, TransactionSignedNoHash), DatabaseError>,
rlp_buf: &mut Vec<u8>,
) -> Result<(B256, TxNumber), Box<ProviderError>> {
let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
tx.transaction.eip2718_encode(&tx.signature, rlp_buf);
Ok((keccak256(rlp_buf), tx_id))
}
for chunk in &tx_walker.chunks(chunk_size) {
let (tx, rx) = mpsc::channel();
channels.push(rx);
let chunk: Vec<_> = chunk.collect();
transaction_count += chunk.len();
rayon::spawn(move || {
let mut rlp_buf = Vec::with_capacity(128);
for entry in chunk {
rlp_buf.clear();
let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
}
});
}
let mut tx_list = Vec::with_capacity(transaction_count);
for channel in channels {
while let Ok(tx) = channel.recv() {
let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
tx_list.push((tx_hash, tx_id));
}
}
Ok(tx_list)
},
|_| true,
)
}
}
impl<TX: DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks>> TransactionsProvider
for DatabaseProvider<TX, N>
{
fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
}
fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<TransactionSigned>> {
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Transactions,
id,
|static_file| static_file.transaction_by_id(id),
|| Ok(self.tx.get::<tables::Transactions>(id)?.map(Into::into)),
)
}
fn transaction_by_id_no_hash(
&self,
id: TxNumber,
) -> ProviderResult<Option<TransactionSignedNoHash>> {
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Transactions,
id,
|static_file| static_file.transaction_by_id_no_hash(id),
|| Ok(self.tx.get::<tables::Transactions>(id)?),
)
}
fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<TransactionSigned>> {
if let Some(id) = self.transaction_id(hash)? {
Ok(self.transaction_by_id_no_hash(id)?.map(|tx| TransactionSigned {
hash,
signature: tx.signature,
transaction: tx.transaction,
}))
} else {
Ok(None)
}
.map(|tx| tx.map(Into::into))
}
fn transaction_by_hash_with_meta(
&self,
tx_hash: TxHash,
) -> ProviderResult<Option<(TransactionSigned, TransactionMeta)>> {
let mut transaction_cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
if let Some(transaction_id) = self.transaction_id(tx_hash)? {
if let Some(tx) = self.transaction_by_id_no_hash(transaction_id)? {
let transaction = TransactionSigned {
hash: tx_hash,
signature: tx.signature,
transaction: tx.transaction,
};
if let Some(block_number) =
transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))?
{
if let Some(sealed_header) = self.sealed_header(block_number)? {
let (header, block_hash) = sealed_header.split();
if let Some(block_body) = self.block_body_indices(block_number)? {
let index = transaction_id - block_body.first_tx_num();
let meta = TransactionMeta {
tx_hash,
index,
block_hash,
block_number,
base_fee: header.base_fee_per_gas,
excess_blob_gas: header.excess_blob_gas,
timestamp: header.timestamp,
};
return Ok(Some((transaction, meta)))
}
}
}
}
}
Ok(None)
}
fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
let mut cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
Ok(cursor.seek(id)?.map(|(_, bn)| bn))
}
fn transactions_by_block(
&self,
id: BlockHashOrNumber,
) -> ProviderResult<Option<Vec<TransactionSigned>>> {
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions>()?;
if let Some(block_number) = self.convert_hash_or_number(id)? {
if let Some(body) = self.block_body_indices(block_number)? {
let tx_range = body.tx_num_range();
return if tx_range.is_empty() {
Ok(Some(Vec::new()))
} else {
Ok(Some(
self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?
.into_iter()
.map(Into::into)
.collect(),
))
}
}
}
Ok(None)
}
fn transactions_by_block_range(
&self,
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<Vec<Vec<TransactionSigned>>> {
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions>()?;
let mut results = Vec::new();
let mut body_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
for entry in body_cursor.walk_range(range)? {
let (_, body) = entry?;
let tx_num_range = body.tx_num_range();
if tx_num_range.is_empty() {
results.push(Vec::new());
} else {
results.push(
self.transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)?
.into_iter()
.map(Into::into)
.collect(),
);
}
}
Ok(results)
}
fn transactions_by_tx_range(
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<TransactionSignedNoHash>> {
self.transactions_by_tx_range_with_cursor(
range,
&mut self.tx.cursor_read::<tables::Transactions>()?,
)
}
fn senders_by_tx_range(
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Address>> {
self.cursor_read_collect::<tables::TransactionSenders>(range).map_err(Into::into)
}
fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
Ok(self.tx.get::<tables::TransactionSenders>(id)?)
}
}
impl<TX: DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks>> ReceiptProvider
for DatabaseProvider<TX, N>
{
fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Receipt>> {
self.static_file_provider.get_with_static_file_or_database(
StaticFileSegment::Receipts,
id,
|static_file| static_file.receipt(id),
|| Ok(self.tx.get::<tables::Receipts>(id)?),
)
}
fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Receipt>> {
if let Some(id) = self.transaction_id(hash)? {
self.receipt(id)
} else {
Ok(None)
}
}
fn receipts_by_block(&self, block: BlockHashOrNumber) -> ProviderResult<Option<Vec<Receipt>>> {
if let Some(number) = self.convert_hash_or_number(block)? {
if let Some(body) = self.block_body_indices(number)? {
let tx_range = body.tx_num_range();
return if tx_range.is_empty() {
Ok(Some(Vec::new()))
} else {
self.receipts_by_tx_range(tx_range).map(Some)
}
}
}
Ok(None)
}
fn receipts_by_tx_range(
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Receipt>> {
self.static_file_provider.get_range_with_static_file_or_database(
StaticFileSegment::Receipts,
to_range(range),
|static_file, range, _| static_file.receipts_by_tx_range(range),
|range, _| self.cursor_read_collect::<tables::Receipts>(range).map_err(Into::into),
|_| true,
)
}
}
impl<TX: DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks>> WithdrawalsProvider
for DatabaseProvider<TX, N>
{
fn withdrawals_by_block(
&self,
id: BlockHashOrNumber,
timestamp: u64,
) -> ProviderResult<Option<Withdrawals>> {
if self.chain_spec.is_shanghai_active_at_timestamp(timestamp) {
if let Some(number) = self.convert_hash_or_number(id)? {
let withdrawals = self
.tx
.get::<tables::BlockWithdrawals>(number)
.map(|w| w.map(|w| w.withdrawals))?
.unwrap_or_default();
return Ok(Some(withdrawals))
}
}
Ok(None)
}
fn latest_withdrawal(&self) -> ProviderResult<Option<Withdrawal>> {
let latest_block_withdrawal = self.tx.cursor_read::<tables::BlockWithdrawals>()?.last()?;
Ok(latest_block_withdrawal
.and_then(|(_, mut block_withdrawal)| block_withdrawal.withdrawals.pop()))
}
}
impl<TX: DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks>> EvmEnvProvider
for DatabaseProvider<TX, N>
{
fn fill_env_at<EvmConfig>(
&self,
cfg: &mut CfgEnvWithHandlerCfg,
block_env: &mut BlockEnv,
at: BlockHashOrNumber,
evm_config: EvmConfig,
) -> ProviderResult<()>
where
EvmConfig: ConfigureEvmEnv<Header = Header>,
{
let hash = self.convert_number(at)?.ok_or(ProviderError::HeaderNotFound(at))?;
let header = self.header(&hash)?.ok_or(ProviderError::HeaderNotFound(at))?;
self.fill_env_with_header(cfg, block_env, &header, evm_config)
}
fn fill_env_with_header<EvmConfig>(
&self,
cfg: &mut CfgEnvWithHandlerCfg,
block_env: &mut BlockEnv,
header: &Header,
evm_config: EvmConfig,
) -> ProviderResult<()>
where
EvmConfig: ConfigureEvmEnv<Header = Header>,
{
let total_difficulty = self
.header_td_by_number(header.number)?
.ok_or_else(|| ProviderError::HeaderNotFound(header.number.into()))?;
evm_config.fill_cfg_and_block_env(cfg, block_env, header, total_difficulty);
Ok(())
}
fn fill_cfg_env_at<EvmConfig>(
&self,
cfg: &mut CfgEnvWithHandlerCfg,
at: BlockHashOrNumber,
evm_config: EvmConfig,
) -> ProviderResult<()>
where
EvmConfig: ConfigureEvmEnv<Header = Header>,
{
let hash = self.convert_number(at)?.ok_or(ProviderError::HeaderNotFound(at))?;
let header = self.header(&hash)?.ok_or(ProviderError::HeaderNotFound(at))?;
self.fill_cfg_env_with_header(cfg, &header, evm_config)
}
fn fill_cfg_env_with_header<EvmConfig>(
&self,
cfg: &mut CfgEnvWithHandlerCfg,
header: &Header,
evm_config: EvmConfig,
) -> ProviderResult<()>
where
EvmConfig: ConfigureEvmEnv<Header = Header>,
{
let total_difficulty = self
.header_td_by_number(header.number)?
.ok_or_else(|| ProviderError::HeaderNotFound(header.number.into()))?;
evm_config.fill_cfg_env(cfg, header, total_difficulty);
Ok(())
}
}
impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
Ok(self.tx.get::<tables::StageCheckpoints>(id.to_string())?)
}
fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
}
fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
self.tx
.cursor_read::<tables::StageCheckpoints>()?
.walk(None)?
.collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
.map_err(ProviderError::Database)
}
}
impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
fn save_stage_checkpoint(
&self,
id: StageId,
checkpoint: StageCheckpoint,
) -> ProviderResult<()> {
Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
}
fn save_stage_checkpoint_progress(
&self,
id: StageId,
checkpoint: Vec<u8>,
) -> ProviderResult<()> {
Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
}
fn update_pipeline_stages(
&self,
block_number: BlockNumber,
drop_stage_checkpoint: bool,
) -> ProviderResult<()> {
let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
for stage_id in StageId::ALL {
let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
cursor.upsert(
stage_id.to_string(),
StageCheckpoint {
block_number,
..if drop_stage_checkpoint { Default::default() } else { checkpoint }
},
)?;
}
Ok(())
}
}
impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
fn plain_state_storages(
&self,
addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
addresses_with_keys
.into_iter()
.map(|(address, storage)| {
storage
.into_iter()
.map(|key| -> ProviderResult<_> {
Ok(plain_storage
.seek_by_key_subkey(address, key)?
.filter(|v| v.key == key)
.unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
})
.collect::<ProviderResult<Vec<_>>>()
.map(|storage| (address, storage))
})
.collect::<ProviderResult<Vec<(_, _)>>>()
}
fn changed_storages_with_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
self.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(BlockNumberAddress::range(range))?
.try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
let (BlockNumberAddress((_, address)), storage_entry) = entry?;
accounts.entry(address).or_default().insert(storage_entry.key);
Ok(accounts)
})
}
fn changed_storages_and_blocks_with_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
let storage_changeset_lists =
changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
BTreeMap::new(),
|mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
let (index, storage) = entry?;
storages
.entry((index.address(), storage.key))
.or_default()
.push(index.block_number());
Ok(storages)
},
)?;
Ok(storage_changeset_lists)
}
}
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StateChangeWriter for DatabaseProvider<TX, N> {
fn write_state_reverts(
&self,
reverts: PlainStateReverts,
first_block: BlockNumber,
) -> ProviderResult<()> {
tracing::trace!("Writing storage changes");
let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
let mut storage_changeset_cursor =
self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
let block_number = first_block + block_index as BlockNumber;
tracing::trace!(block_number, "Writing block change");
storage_changes.par_sort_unstable_by_key(|a| a.address);
for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
let storage_id = BlockNumberAddress((block_number, address));
let mut storage = storage_revert
.into_iter()
.map(|(k, v)| (B256::new(k.to_be_bytes()), v))
.collect::<Vec<_>>();
storage.par_sort_unstable_by_key(|a| a.0);
let mut wiped_storage = Vec::new();
if wiped {
tracing::trace!(?address, "Wiping storage");
if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
wiped_storage.push((entry.key, entry.value));
while let Some(entry) = storages_cursor.next_dup_val()? {
wiped_storage.push((entry.key, entry.value))
}
}
}
tracing::trace!(?address, ?storage, "Writing storage reverts");
for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
}
}
}
tracing::trace!("Writing account changes");
let mut account_changeset_cursor =
self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
let block_number = first_block + block_index as BlockNumber;
account_block_reverts.par_sort_by_key(|a| a.0);
for (address, info) in account_block_reverts {
account_changeset_cursor.append_dup(
block_number,
AccountBeforeTx { address, info: info.map(Into::into) },
)?;
}
}
Ok(())
}
fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
changes.accounts.par_sort_by_key(|a| a.0);
changes.storage.par_sort_by_key(|a| a.address);
changes.contracts.par_sort_by_key(|a| a.0);
tracing::trace!(len = changes.accounts.len(), "Writing new account state");
let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
for (address, account) in changes.accounts {
if let Some(account) = account {
tracing::trace!(?address, "Updating plain state account");
accounts_cursor.upsert(address, account.into())?;
} else if accounts_cursor.seek_exact(address)?.is_some() {
tracing::trace!(?address, "Deleting plain state account");
accounts_cursor.delete_current()?;
}
}
tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
for (hash, bytecode) in changes.contracts {
bytecodes_cursor.upsert(hash, Bytecode(bytecode))?;
}
tracing::trace!(len = changes.storage.len(), "Writing new storage state");
let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
storages_cursor.delete_current_duplicates()?;
}
let mut storage = storage
.into_iter()
.map(|(k, value)| StorageEntry { key: k.into(), value })
.collect::<Vec<_>>();
storage.par_sort_unstable_by_key(|a| a.key);
for entry in storage {
tracing::trace!(?address, ?entry.key, "Updating plain state storage");
if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? {
if db_entry.key == entry.key {
storages_cursor.delete_current()?;
}
}
if !entry.value.is_zero() {
storages_cursor.upsert(address, entry)?;
}
}
}
Ok(())
}
fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
if let Some(account) = account {
hashed_accounts_cursor.upsert(hashed_address, account)?;
} else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
hashed_accounts_cursor.delete_current()?;
}
}
let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
let mut hashed_storage_cursor =
self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
for (hashed_address, storage) in sorted_storages {
if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
hashed_storage_cursor.delete_current_duplicates()?;
}
for (hashed_slot, value) in storage.storage_slots_sorted() {
let entry = StorageEntry { key: hashed_slot, value };
if let Some(db_entry) =
hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)?
{
if db_entry.key == entry.key {
hashed_storage_cursor.delete_current()?;
}
}
if !entry.value.is_zero() {
hashed_storage_cursor.upsert(*hashed_address, entry)?;
}
}
}
Ok(())
}
fn remove_state(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
if range.is_empty() {
return Ok(())
}
let block_bodies = self.get::<tables::BlockBodyIndices>(range.clone())?;
let from_transaction_num =
block_bodies.first().expect("already checked if there are blocks").1.first_tx_num();
let to_transaction_num =
block_bodies.last().expect("already checked if there are blocks").1.last_tx_num();
let storage_range = BlockNumberAddress::range(range.clone());
let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
let (state, _) = self.populate_bundle_state(
account_changeset,
storage_changeset,
&mut plain_accounts_cursor,
&mut plain_storage_cursor,
)?;
for (address, (old_account, new_account, storage)) in &state {
if old_account != new_account {
let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
if let Some(account) = old_account {
plain_accounts_cursor.upsert(*address, *account)?;
} else if existing_entry.is_some() {
plain_accounts_cursor.delete_current()?;
}
}
for (storage_key, (old_storage_value, _new_storage_value)) in storage {
let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
if plain_storage_cursor
.seek_by_key_subkey(*address, *storage_key)?
.filter(|s| s.key == *storage_key)
.is_some()
{
plain_storage_cursor.delete_current()?
}
if !old_storage_value.is_zero() {
plain_storage_cursor.upsert(*address, storage_entry)?;
}
}
}
self.remove::<tables::Receipts>(from_transaction_num..=to_transaction_num)?;
Ok(())
}
fn take_state(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<ExecutionOutcome> {
if range.is_empty() {
return Ok(ExecutionOutcome::default())
}
let start_block_number = *range.start();
let block_bodies = self.get::<tables::BlockBodyIndices>(range.clone())?;
let from_transaction_num =
block_bodies.first().expect("already checked if there are blocks").1.first_tx_num();
let to_transaction_num =
block_bodies.last().expect("already checked if there are blocks").1.last_tx_num();
let storage_range = BlockNumberAddress::range(range.clone());
let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
let (state, reverts) = self.populate_bundle_state(
account_changeset,
storage_changeset,
&mut plain_accounts_cursor,
&mut plain_storage_cursor,
)?;
for (address, (old_account, new_account, storage)) in &state {
if old_account != new_account {
let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
if let Some(account) = old_account {
plain_accounts_cursor.upsert(*address, *account)?;
} else if existing_entry.is_some() {
plain_accounts_cursor.delete_current()?;
}
}
for (storage_key, (old_storage_value, _new_storage_value)) in storage {
let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
if plain_storage_cursor
.seek_by_key_subkey(*address, *storage_key)?
.filter(|s| s.key == *storage_key)
.is_some()
{
plain_storage_cursor.delete_current()?
}
if !old_storage_value.is_zero() {
plain_storage_cursor.upsert(*address, storage_entry)?;
}
}
}
let mut receipt_iter =
self.take::<tables::Receipts>(from_transaction_num..=to_transaction_num)?.into_iter();
let mut receipts = Vec::with_capacity(block_bodies.len());
for (_, block_body) in block_bodies {
let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
for _ in block_body.tx_num_range() {
if let Some((_, receipt)) = receipt_iter.next() {
block_receipts.push(Some(receipt));
}
}
receipts.push(block_receipts);
}
Ok(ExecutionOutcome::new_init(
state,
reverts,
Vec::new(),
receipts.into(),
start_block_number,
Vec::new(),
))
}
}
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize> {
if trie_updates.is_empty() {
return Ok(0)
}
let mut num_entries = 0;
let mut account_updates = trie_updates
.removed_nodes_ref()
.iter()
.filter_map(|n| {
(!trie_updates.account_nodes_ref().contains_key(n)).then_some((n, None))
})
.collect::<Vec<_>>();
account_updates.extend(
trie_updates.account_nodes_ref().iter().map(|(nibbles, node)| (nibbles, Some(node))),
);
account_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
let tx = self.tx_ref();
let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
for (key, updated_node) in account_updates {
let nibbles = StoredNibbles(key.clone());
match updated_node {
Some(node) => {
if !nibbles.0.is_empty() {
num_entries += 1;
account_trie_cursor.upsert(nibbles, node.clone())?;
}
}
None => {
num_entries += 1;
if account_trie_cursor.seek_exact(nibbles)?.is_some() {
account_trie_cursor.delete_current()?;
}
}
}
}
num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref())?;
Ok(num_entries)
}
}
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
fn write_storage_trie_updates(
&self,
storage_tries: &HashMap<B256, StorageTrieUpdates>,
) -> ProviderResult<usize> {
let mut num_entries = 0;
let mut storage_tries = Vec::from_iter(storage_tries);
storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
for (hashed_address, storage_trie_updates) in storage_tries {
let mut db_storage_trie_cursor =
DatabaseStorageTrieCursor::new(cursor, *hashed_address);
num_entries +=
db_storage_trie_cursor.write_storage_trie_updates(storage_trie_updates)?;
cursor = db_storage_trie_cursor.cursor;
}
Ok(num_entries)
}
fn write_individual_storage_trie_updates(
&self,
hashed_address: B256,
updates: &StorageTrieUpdates,
) -> ProviderResult<usize> {
if updates.is_empty() {
return Ok(0)
}
let cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
let mut trie_db_cursor = DatabaseStorageTrieCursor::new(cursor, hashed_address);
Ok(trie_db_cursor.write_storage_trie_updates(updates)?)
}
}
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
fn unwind_account_hashing<'a>(
&self,
changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
let hashed_accounts = changesets
.into_iter()
.map(|(_, e)| (keccak256(e.address), e.info))
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect::<BTreeMap<_, _>>();
let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
for (hashed_address, account) in &hashed_accounts {
if let Some(account) = account {
hashed_accounts_cursor.upsert(*hashed_address, *account)?;
} else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
hashed_accounts_cursor.delete_current()?;
}
}
Ok(hashed_accounts)
}
fn unwind_account_hashing_range(
&self,
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
let changesets = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?;
self.unwind_account_hashing(changesets.iter())
}
fn insert_account_for_hashing(
&self,
changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
let hashed_accounts =
changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
for (hashed_address, account) in &hashed_accounts {
if let Some(account) = account {
hashed_accounts_cursor.upsert(*hashed_address, *account)?;
} else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
hashed_accounts_cursor.delete_current()?;
}
}
Ok(hashed_accounts)
}
fn unwind_storage_hashing(
&self,
changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
let mut hashed_storages = changesets
.into_iter()
.map(|(BlockNumberAddress((_, address)), storage_entry)| {
(keccak256(address), keccak256(storage_entry.key), storage_entry.value)
})
.collect::<Vec<_>>();
hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
HashMap::with_capacity(hashed_storages.len());
let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
hashed_storage_keys.entry(hashed_address).or_default().insert(key);
if hashed_storage
.seek_by_key_subkey(hashed_address, key)?
.filter(|entry| entry.key == key)
.is_some()
{
hashed_storage.delete_current()?;
}
if !value.is_zero() {
hashed_storage.upsert(hashed_address, StorageEntry { key, value })?;
}
}
Ok(hashed_storage_keys)
}
fn unwind_storage_hashing_range(
&self,
range: impl RangeBounds<BlockNumberAddress>,
) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
let changesets = self
.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?;
self.unwind_storage_hashing(changesets.into_iter())
}
fn insert_storage_for_hashing(
&self,
storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
let hashed_storages =
storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
map.insert(keccak256(entry.key), entry.value);
map
});
map.insert(keccak256(address), storage);
map
});
let hashed_storage_keys = hashed_storages
.iter()
.map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
.collect();
let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
if hashed_storage_cursor
.seek_by_key_subkey(hashed_address, key)?
.filter(|entry| entry.key == key)
.is_some()
{
hashed_storage_cursor.delete_current()?;
}
if !value.is_zero() {
hashed_storage_cursor.upsert(hashed_address, StorageEntry { key, value })?;
}
Ok(())
})
})?;
Ok(hashed_storage_keys)
}
fn insert_hashes(
&self,
range: RangeInclusive<BlockNumber>,
end_block_hash: B256,
expected_state_root: B256,
) -> ProviderResult<()> {
let mut account_prefix_set = PrefixSetMut::default();
let mut storage_prefix_sets: HashMap<B256, PrefixSetMut> = HashMap::default();
let mut destroyed_accounts = HashSet::default();
let mut durations_recorder = metrics::DurationsRecorder::default();
{
let lists = self.changed_storages_with_range(range.clone())?;
let storages = self.plain_state_storages(lists)?;
let storage_entries = self.insert_storage_for_hashing(storages)?;
for (hashed_address, hashed_slots) in storage_entries {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
for slot in hashed_slots {
storage_prefix_sets
.entry(hashed_address)
.or_default()
.insert(Nibbles::unpack(slot));
}
}
}
durations_recorder.record_relative(metrics::Action::InsertStorageHashing);
{
let lists = self.changed_accounts_with_range(range.clone())?;
let accounts = self.basic_accounts(lists)?;
let hashed_addresses = self.insert_account_for_hashing(accounts)?;
for (hashed_address, account) in hashed_addresses {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
if account.is_none() {
destroyed_accounts.insert(hashed_address);
}
}
}
durations_recorder.record_relative(metrics::Action::InsertAccountHashing);
{
let prefix_sets = TriePrefixSets {
account_prefix_set: account_prefix_set.freeze(),
storage_prefix_sets: storage_prefix_sets
.into_iter()
.map(|(k, v)| (k, v.freeze()))
.collect(),
destroyed_accounts,
};
let (state_root, trie_updates) = StateRoot::from_tx(&self.tx)
.with_prefix_sets(prefix_sets)
.root_with_updates()
.map_err(Into::<reth_db::DatabaseError>::into)?;
if state_root != expected_state_root {
return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch {
root: GotExpected { got: state_root, expected: expected_state_root },
block_number: *range.end(),
block_hash: end_block_hash,
})))
}
self.write_trie_updates(&trie_updates)?;
}
durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes");
Ok(())
}
}
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
fn unwind_account_history_indices<'a>(
&self,
changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
) -> ProviderResult<usize> {
let mut last_indices = changesets
.into_iter()
.map(|(index, account)| (account.address, *index))
.collect::<Vec<_>>();
last_indices.sort_by_key(|(a, _)| *a);
let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
for &(address, rem_index) in &last_indices {
let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
&mut cursor,
ShardedKey::last(address),
rem_index,
|sharded_key| sharded_key.key == address,
)?;
if !partial_shard.is_empty() {
cursor.insert(
ShardedKey::last(address),
BlockNumberList::new_pre_sorted(partial_shard),
)?;
}
}
let changesets = last_indices.len();
Ok(changesets)
}
fn unwind_account_history_indices_range(
&self,
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<usize> {
let changesets = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?;
self.unwind_account_history_indices(changesets.iter())
}
fn insert_account_history_index(
&self,
account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
) -> ProviderResult<()> {
self.append_history_index::<_, tables::AccountsHistory>(
account_transitions,
ShardedKey::new,
)
}
fn unwind_storage_history_indices(
&self,
changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
) -> ProviderResult<usize> {
let mut storage_changesets = changesets
.into_iter()
.map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
.collect::<Vec<_>>();
storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
for &(address, storage_key, rem_index) in &storage_changesets {
let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
&mut cursor,
StorageShardedKey::last(address, storage_key),
rem_index,
|storage_sharded_key| {
storage_sharded_key.address == address &&
storage_sharded_key.sharded_key.key == storage_key
},
)?;
if !partial_shard.is_empty() {
cursor.insert(
StorageShardedKey::last(address, storage_key),
BlockNumberList::new_pre_sorted(partial_shard),
)?;
}
}
let changesets = storage_changesets.len();
Ok(changesets)
}
fn unwind_storage_history_indices_range(
&self,
range: impl RangeBounds<BlockNumberAddress>,
) -> ProviderResult<usize> {
let changesets = self
.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?;
self.unwind_storage_history_indices(changesets.into_iter())
}
fn insert_storage_history_index(
&self,
storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
) -> ProviderResult<()> {
self.append_history_index::<_, tables::StoragesHistory>(
storage_transitions,
|(address, storage_key), highest_block_number| {
StorageShardedKey::new(address, storage_key, highest_block_number)
},
)
}
fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
{
let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
self.insert_account_history_index(indices)?;
}
{
let indices = self.changed_storages_and_blocks_with_range(range)?;
self.insert_storage_history_index(indices)?;
}
Ok(())
}
}
impl<TX: DbTx + 'static, N: NodeTypes> StateReader for DatabaseProvider<TX, N> {
fn get_state(&self, block: BlockNumber) -> ProviderResult<Option<ExecutionOutcome>> {
self.get_state(block..=block)
}
}
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks> + 'static>
BlockExecutionWriter for DatabaseProvider<TX, N>
{
fn take_block_and_execution_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Chain> {
let changed_accounts = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range.clone())?
.collect::<Result<Vec<_>, _>>()?;
let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
let mut destroyed_accounts = HashSet::default();
for (hashed_address, account) in hashed_addresses {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
if account.is_none() {
destroyed_accounts.insert(hashed_address);
}
}
self.unwind_account_history_indices(changed_accounts.iter())?;
let storage_range = BlockNumberAddress::range(range.clone());
let changed_storages = self
.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(storage_range)?
.collect::<Result<Vec<_>, _>>()?;
let mut storage_prefix_sets = HashMap::<B256, PrefixSet>::default();
let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
for (hashed_address, hashed_slots) in storage_entries {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
for slot in hashed_slots {
storage_prefix_set.insert(Nibbles::unpack(slot));
}
storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
}
self.unwind_storage_history_indices(changed_storages.iter().copied())?;
let prefix_sets = TriePrefixSets {
account_prefix_set: account_prefix_set.freeze(),
storage_prefix_sets,
destroyed_accounts,
};
let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
.with_prefix_sets(prefix_sets)
.root_with_updates()
.map_err(Into::<reth_db::DatabaseError>::into)?;
let parent_number = range.start().saturating_sub(1);
let parent_state_root = self
.header_by_number(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
.state_root;
if new_state_root != parent_state_root {
let parent_hash = self
.block_hash(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
root: GotExpected { got: new_state_root, expected: parent_state_root },
block_number: parent_number,
block_hash: parent_hash,
})))
}
self.write_trie_updates(&trie_updates)?;
let blocks = self.take_block_range(range.clone())?;
let unwind_to = blocks.first().map(|b| b.number.saturating_sub(1));
let execution_state = self.take_state(range.clone())?;
self.remove::<tables::BlockBodyIndices>(range)?;
if let Some(fork_number) = unwind_to {
self.update_pipeline_stages(fork_number, true)?;
}
Ok(Chain::new(blocks, execution_state, None))
}
fn remove_block_and_execution_range(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let changed_accounts = self
.tx
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range.clone())?
.collect::<Result<Vec<_>, _>>()?;
let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
let mut destroyed_accounts = HashSet::default();
for (hashed_address, account) in hashed_addresses {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
if account.is_none() {
destroyed_accounts.insert(hashed_address);
}
}
self.unwind_account_history_indices(changed_accounts.iter())?;
let storage_range = BlockNumberAddress::range(range.clone());
let changed_storages = self
.tx
.cursor_read::<tables::StorageChangeSets>()?
.walk_range(storage_range)?
.collect::<Result<Vec<_>, _>>()?;
let mut storage_prefix_sets = HashMap::<B256, PrefixSet>::default();
let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
for (hashed_address, hashed_slots) in storage_entries {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
for slot in hashed_slots {
storage_prefix_set.insert(Nibbles::unpack(slot));
}
storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
}
self.unwind_storage_history_indices(changed_storages.iter().copied())?;
let prefix_sets = TriePrefixSets {
account_prefix_set: account_prefix_set.freeze(),
storage_prefix_sets,
destroyed_accounts,
};
let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
.with_prefix_sets(prefix_sets)
.root_with_updates()
.map_err(Into::<reth_db::DatabaseError>::into)?;
let parent_number = range.start().saturating_sub(1);
let parent_state_root = self
.header_by_number(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
.state_root;
if new_state_root != parent_state_root {
let parent_hash = self
.block_hash(parent_number)?
.ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
root: GotExpected { got: new_state_root, expected: parent_state_root },
block_number: parent_number,
block_hash: parent_hash,
})))
}
self.write_trie_updates(&trie_updates)?;
let blocks = self.take_block_range(range.clone())?;
let unwind_to = blocks.first().map(|b| b.number.saturating_sub(1));
self.remove_state(range.clone())?;
self.remove::<tables::BlockBodyIndices>(range)?;
if let Some(block_number) = unwind_to {
self.update_pipeline_stages(block_number, true)?;
}
Ok(())
}
}
impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks> + 'static> BlockWriter
for DatabaseProvider<TX, N>
{
type Body = BlockBody;
fn insert_block(
&self,
block: SealedBlockWithSenders,
) -> ProviderResult<StoredBlockBodyIndices> {
let block_number = block.number;
let mut durations_recorder = metrics::DurationsRecorder::default();
self.tx.put::<tables::CanonicalHeaders>(block_number, block.hash())?;
durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders);
self.tx.put::<tables::Headers>(block_number, block.header.as_ref().clone())?;
durations_recorder.record_relative(metrics::Action::InsertHeaders);
self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
let ttd = if block_number == 0 {
block.difficulty
} else {
let parent_block_number = block_number - 1;
let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
durations_recorder.record_relative(metrics::Action::GetParentTD);
parent_ttd + block.difficulty
};
self.tx.put::<tables::HeaderTerminalDifficulties>(block_number, ttd.into())?;
durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties);
if !block.body.ommers.is_empty() {
self.tx.put::<tables::BlockOmmers>(
block_number,
StoredBlockOmmers { ommers: block.block.body.ommers },
)?;
durations_recorder.record_relative(metrics::Action::InsertBlockOmmers);
}
let mut next_tx_num = self
.tx
.cursor_read::<tables::TransactionBlocks>()?
.last()?
.map(|(n, _)| n + 1)
.unwrap_or_default();
durations_recorder.record_relative(metrics::Action::GetNextTxNum);
let first_tx_num = next_tx_num;
let tx_count = block.block.body.transactions.len() as u64;
let mut tx_senders_elapsed = Duration::default();
let mut transactions_elapsed = Duration::default();
let mut tx_hash_numbers_elapsed = Duration::default();
for (transaction, sender) in
block.block.body.transactions.into_iter().zip(block.senders.iter())
{
let hash = transaction.hash();
if self
.prune_modes
.sender_recovery
.as_ref()
.filter(|prune_mode| prune_mode.is_full())
.is_none()
{
let start = Instant::now();
self.tx.put::<tables::TransactionSenders>(next_tx_num, *sender)?;
tx_senders_elapsed += start.elapsed();
}
let start = Instant::now();
self.tx.put::<tables::Transactions>(next_tx_num, transaction.into())?;
let elapsed = start.elapsed();
if elapsed > Duration::from_secs(1) {
warn!(
target: "providers::db",
?block_number,
tx_num = %next_tx_num,
hash = %hash,
?elapsed,
"Transaction insertion took too long"
);
}
transactions_elapsed += elapsed;
if self
.prune_modes
.transaction_lookup
.filter(|prune_mode| prune_mode.is_full())
.is_none()
{
let start = Instant::now();
self.tx.put::<tables::TransactionHashNumbers>(hash, next_tx_num)?;
tx_hash_numbers_elapsed += start.elapsed();
}
next_tx_num += 1;
}
durations_recorder
.record_duration(metrics::Action::InsertTransactionSenders, tx_senders_elapsed);
durations_recorder
.record_duration(metrics::Action::InsertTransactions, transactions_elapsed);
durations_recorder.record_duration(
metrics::Action::InsertTransactionHashNumbers,
tx_hash_numbers_elapsed,
);
if let Some(withdrawals) = block.block.body.withdrawals {
if !withdrawals.is_empty() {
self.tx.put::<tables::BlockWithdrawals>(
block_number,
StoredBlockWithdrawals { withdrawals },
)?;
durations_recorder.record_relative(metrics::Action::InsertBlockWithdrawals);
}
}
let block_indices = StoredBlockBodyIndices { first_tx_num, tx_count };
self.tx.put::<tables::BlockBodyIndices>(block_number, block_indices)?;
durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
if !block_indices.is_empty() {
self.tx.put::<tables::TransactionBlocks>(block_indices.last_tx_num(), block_number)?;
durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
}
debug!(
target: "providers::db",
?block_number,
actions = ?durations_recorder.actions,
"Inserted block"
);
Ok(block_indices)
}
fn append_block_bodies(
&self,
bodies: impl Iterator<Item = (BlockNumber, Option<BlockBody>)>,
) -> ProviderResult<()> {
let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
let mut ommers_cursor = self.tx.cursor_write::<tables::BlockOmmers>()?;
let mut withdrawals_cursor = self.tx.cursor_write::<tables::BlockWithdrawals>()?;
let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
for (block_number, body) in bodies {
let tx_count = body.as_ref().map(|b| b.transactions.len() as u64).unwrap_or_default();
let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
block_indices_cursor.append(block_number, block_indices)?;
next_tx_num += tx_count;
let Some(body) = body else { continue };
if !body.transactions.is_empty() {
tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
}
if !body.ommers.is_empty() {
ommers_cursor.append(block_number, StoredBlockOmmers { ommers: body.ommers })?;
}
if let Some(withdrawals) = body.withdrawals {
if !withdrawals.is_empty() {
withdrawals_cursor
.append(block_number, StoredBlockWithdrawals { withdrawals })?;
}
}
}
Ok(())
}
fn append_blocks_with_state(
&self,
blocks: Vec<SealedBlockWithSenders>,
execution_outcome: ExecutionOutcome,
hashed_state: HashedPostStateSorted,
trie_updates: TrieUpdates,
) -> ProviderResult<()> {
if blocks.is_empty() {
debug!(target: "providers::db", "Attempted to append empty block range");
return Ok(())
}
let first_number = blocks.first().unwrap().number;
let last = blocks.last().unwrap();
let last_block_number = last.number;
let mut durations_recorder = metrics::DurationsRecorder::default();
for block in blocks {
self.insert_block(block)?;
durations_recorder.record_relative(metrics::Action::InsertBlock);
}
let mut storage_writer = UnifiedStorageWriter::from_database(self);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;
durations_recorder.record_relative(metrics::Action::InsertState);
self.write_hashed_state(&hashed_state)?;
self.write_trie_updates(&trie_updates)?;
durations_recorder.record_relative(metrics::Action::InsertHashes);
self.update_history_indices(first_number..=last_block_number)?;
durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
self.update_pipeline_stages(last_block_number, false)?;
durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
Ok(())
}
}
impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
fn get_prune_checkpoint(
&self,
segment: PruneSegment,
) -> ProviderResult<Option<PruneCheckpoint>> {
Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
}
fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
Ok(self
.tx
.cursor_read::<tables::PruneCheckpoints>()?
.walk(None)?
.collect::<Result<_, _>>()?)
}
}
impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
fn save_prune_checkpoint(
&self,
segment: PruneSegment,
checkpoint: PruneCheckpoint,
) -> ProviderResult<()> {
Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
}
}
impl<TX: DbTx + 'static, N: NodeTypes> StatsReader for DatabaseProvider<TX, N> {
fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
let db_entries = self.tx.entries::<T>()?;
let static_file_entries = match self.static_file_provider.count_entries::<T>() {
Ok(entries) => entries,
Err(ProviderError::UnsupportedProvider) => 0,
Err(err) => return Err(err),
};
Ok(db_entries + static_file_entries)
}
}
impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
let mut finalized_blocks = self
.tx
.cursor_read::<tables::ChainState>()?
.walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
.take(1)
.collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
Ok(last_finalized_block_number)
}
fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
let mut finalized_blocks = self
.tx
.cursor_read::<tables::ChainState>()?
.walk(Some(tables::ChainStateKey::LastSafeBlockBlock))?
.take(1)
.collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
Ok(last_finalized_block_number)
}
}
impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
Ok(self
.tx
.put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
}
fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
Ok(self
.tx
.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlockBlock, block_number)?)
}
}
impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
type Tx = TX;
fn tx_ref(&self) -> &Self::Tx {
&self.tx
}
fn tx_mut(&mut self) -> &mut Self::Tx {
&mut self.tx
}
fn into_tx(self) -> Self::Tx {
self.tx
}
fn prune_modes_ref(&self) -> &PruneModes {
self.prune_modes_ref()
}
}
fn recover_block_senders(
senders: &mut Vec<(u64, Address)>,
transactions: &[(u64, TransactionSigned)],
first_transaction: u64,
last_transaction: u64,
) -> ProviderResult<()> {
if senders.len() != transactions.len() {
if senders.len() > transactions.len() {
error!(target: "providers::db", senders=%senders.len(), transactions=%transactions.len(),
first_tx=%first_transaction, last_tx=%last_transaction,
"unexpected senders and transactions mismatch");
}
let missing = transactions.len().saturating_sub(senders.len());
senders.reserve(missing);
let mut missing_senders = Vec::with_capacity(missing);
{
let mut senders = senders.iter().peekable();
for (i, (tx_number, transaction)) in transactions.iter().enumerate() {
if let Some((sender_tx_number, _)) = senders.peek() {
if sender_tx_number == tx_number {
senders.next();
} else {
missing_senders.push((i, tx_number, transaction));
}
} else {
missing_senders.push((i, tx_number, transaction));
}
}
}
let recovered_senders = TransactionSigned::recover_signers(
missing_senders.iter().map(|(_, _, tx)| *tx).collect::<Vec<_>>(),
missing_senders.len(),
)
.ok_or(ProviderError::SenderRecoveryError)?;
for ((i, tx_number, _), sender) in missing_senders.into_iter().zip(recovered_senders) {
senders.insert(i, (*tx_number, sender));
}
debug_assert_eq!(senders.len(), transactions.len(), "missing one or more senders");
debug_assert!(senders.iter().tuple_windows().all(|(a, b)| a.0 < b.0), "senders not sorted");
}
Ok(())
}