use crate::{
args::RollupArgs,
engine::OpEngineValidator,
txpool::{OpTransactionPool, OpTransactionValidator},
OpEngineTypes,
};
use alloy_consensus::Header;
use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig};
use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
use reth_db::transaction::{DbTx, DbTxMut};
use reth_evm::{execute::BasicBlockExecutorProvider, ConfigureEvm};
use reth_network::{NetworkConfig, NetworkHandle, NetworkManager, NetworkPrimitives, PeersInfo};
use reth_node_api::{AddOnsContext, EngineValidator, FullNodeComponents, NodeAddOns, TxTy};
use reth_node_builder::{
components::{
ComponentsBuilder, ConsensusBuilder, ExecutorBuilder, NetworkBuilder,
PayloadServiceBuilder, PoolBuilder, PoolBuilderConfigOverrides,
},
node::{FullNodeTypes, NodeTypes, NodeTypesWithEngine},
rpc::{EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcAddOns, RpcHandle},
BuilderContext, Node, NodeAdapter, NodeComponentsBuilder, PayloadBuilderConfig,
};
use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_consensus::OpBeaconConsensus;
use reth_optimism_evm::{OpEvmConfig, OpExecutionStrategyFactory};
use reth_optimism_payload_builder::{
builder::OpPayloadTransactions,
config::{OpBuilderConfig, OpDAConfig},
};
use reth_optimism_primitives::OpPrimitives;
use reth_optimism_rpc::{
miner::{MinerApiExtServer, OpMinerExtApi},
witness::{DebugExecutionWitnessApiServer, OpDebugWitnessApi},
OpEthApi, SequencerClient,
};
use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
use reth_primitives::{BlockBody, PooledTransaction, TransactionSigned};
use reth_provider::{
providers::ChainStorage, BlockBodyReader, BlockBodyWriter, CanonStateSubscriptions,
ChainSpecProvider, DBProvider, EthStorage, ProviderResult, ReadBodyInput, StorageLocation,
};
use reth_rpc_server_types::RethRpcModule;
use reth_tracing::tracing::{debug, info};
use reth_transaction_pool::{
blobstore::DiskFileBlobStore, CoinbaseTipOrdering, PoolTransaction, TransactionPool,
TransactionValidationTaskExecutor,
};
use reth_trie_db::MerklePatriciaTrie;
use std::sync::Arc;
#[derive(Debug, Default, Clone)]
pub struct OpStorage(EthStorage);
impl<Provider: DBProvider<Tx: DbTxMut>> BlockBodyWriter<Provider, BlockBody> for OpStorage {
fn write_block_bodies(
&self,
provider: &Provider,
bodies: Vec<(u64, Option<BlockBody>)>,
write_to: StorageLocation,
) -> ProviderResult<()> {
self.0.write_block_bodies(provider, bodies, write_to)
}
fn remove_block_bodies_above(
&self,
provider: &Provider,
block: alloy_primitives::BlockNumber,
remove_from: StorageLocation,
) -> ProviderResult<()> {
self.0.remove_block_bodies_above(provider, block, remove_from)
}
}
impl<Provider: DBProvider + ChainSpecProvider<ChainSpec: EthereumHardforks>>
BlockBodyReader<Provider> for OpStorage
{
type Block = reth_primitives::Block;
fn read_block_bodies(
&self,
provider: &Provider,
inputs: Vec<ReadBodyInput<'_, Self::Block>>,
) -> ProviderResult<Vec<BlockBody>> {
self.0.read_block_bodies(provider, inputs)
}
}
impl ChainStorage<OpPrimitives> for OpStorage {
fn reader<TX, Types>(
&self,
) -> impl reth_provider::ChainStorageReader<reth_provider::DatabaseProvider<TX, Types>, OpPrimitives>
where
TX: DbTx + 'static,
Types: reth_provider::providers::NodeTypesForProvider<Primitives = OpPrimitives>,
{
self
}
fn writer<TX, Types>(
&self,
) -> impl reth_provider::ChainStorageWriter<reth_provider::DatabaseProvider<TX, Types>, OpPrimitives>
where
TX: DbTxMut + DbTx + 'static,
Types: NodeTypes<Primitives = OpPrimitives>,
{
self
}
}
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct OpNode {
pub args: RollupArgs,
pub da_config: OpDAConfig,
}
impl OpNode {
pub fn new(args: RollupArgs) -> Self {
Self { args, da_config: OpDAConfig::default() }
}
pub fn with_da_config(mut self, da_config: OpDAConfig) -> Self {
self.da_config = da_config;
self
}
pub fn components<Node>(
&self,
) -> ComponentsBuilder<
Node,
OpPoolBuilder,
OpPayloadBuilder,
OpNetworkBuilder,
OpExecutorBuilder,
OpConsensusBuilder,
>
where
Node: FullNodeTypes<
Types: NodeTypesWithEngine<
Engine = OpEngineTypes,
ChainSpec = OpChainSpec,
Primitives = OpPrimitives,
>,
>,
{
let RollupArgs { disable_txpool_gossip, compute_pending_block, discovery_v4, .. } =
self.args;
ComponentsBuilder::default()
.node_types::<Node>()
.pool(OpPoolBuilder::default())
.payload(
OpPayloadBuilder::new(compute_pending_block).with_da_config(self.da_config.clone()),
)
.network(OpNetworkBuilder {
disable_txpool_gossip,
disable_discovery_v4: !discovery_v4,
})
.executor(OpExecutorBuilder::default())
.consensus(OpConsensusBuilder::default())
}
}
impl<N> Node<N> for OpNode
where
N: FullNodeTypes<
Types: NodeTypesWithEngine<
Engine = OpEngineTypes,
ChainSpec = OpChainSpec,
Primitives = OpPrimitives,
Storage = OpStorage,
>,
>,
{
type ComponentsBuilder = ComponentsBuilder<
N,
OpPoolBuilder,
OpPayloadBuilder,
OpNetworkBuilder,
OpExecutorBuilder,
OpConsensusBuilder,
>;
type AddOns =
OpAddOns<NodeAdapter<N, <Self::ComponentsBuilder as NodeComponentsBuilder<N>>::Components>>;
fn components_builder(&self) -> Self::ComponentsBuilder {
Self::components(self)
}
fn add_ons(&self) -> Self::AddOns {
Self::AddOns::builder()
.with_sequencer(self.args.sequencer_http.clone())
.with_da_config(self.da_config.clone())
.build()
}
}
impl NodeTypes for OpNode {
type Primitives = OpPrimitives;
type ChainSpec = OpChainSpec;
type StateCommitment = MerklePatriciaTrie;
type Storage = OpStorage;
}
impl NodeTypesWithEngine for OpNode {
type Engine = OpEngineTypes;
}
#[derive(Debug)]
pub struct OpAddOns<N: FullNodeComponents> {
pub rpc_add_ons: RpcAddOns<N, OpEthApi<N>, OpEngineValidatorBuilder>,
pub da_config: OpDAConfig,
}
impl<N: FullNodeComponents<Types: NodeTypes<Primitives = OpPrimitives>>> Default for OpAddOns<N> {
fn default() -> Self {
Self::builder().build()
}
}
impl<N: FullNodeComponents<Types: NodeTypes<Primitives = OpPrimitives>>> OpAddOns<N> {
pub fn builder() -> OpAddOnsBuilder {
OpAddOnsBuilder::default()
}
}
impl<N> NodeAddOns<N> for OpAddOns<N>
where
N: FullNodeComponents<
Types: NodeTypesWithEngine<
ChainSpec = OpChainSpec,
Primitives = OpPrimitives,
Storage = OpStorage,
Engine = OpEngineTypes,
>,
>,
OpEngineValidator: EngineValidator<<N::Types as NodeTypesWithEngine>::Engine>,
{
type Handle = RpcHandle<N, OpEthApi<N>>;
async fn launch_add_ons(
self,
ctx: reth_node_api::AddOnsContext<'_, N>,
) -> eyre::Result<Self::Handle> {
let Self { rpc_add_ons, da_config } = self;
let debug_ext = OpDebugWitnessApi::new(
ctx.node.provider().clone(),
ctx.node.evm_config().clone(),
Box::new(ctx.node.task_executor().clone()),
);
let miner_ext = OpMinerExtApi::new(da_config);
rpc_add_ons
.launch_add_ons_with(ctx, move |modules, auth_modules| {
debug!(target: "reth::cli", "Installing debug payload witness rpc endpoint");
modules.merge_if_module_configured(RethRpcModule::Debug, debug_ext.into_rpc())?;
modules.merge_if_module_configured(
RethRpcModule::Miner,
miner_ext.clone().into_rpc(),
)?;
if modules.module_config().contains_any(&RethRpcModule::Miner) {
debug!(target: "reth::cli", "Installing miner DA rpc enddpoint");
auth_modules.merge_auth_methods(miner_ext.into_rpc())?;
}
Ok(())
})
.await
}
}
impl<N> RethRpcAddOns<N> for OpAddOns<N>
where
N: FullNodeComponents<
Types: NodeTypesWithEngine<
ChainSpec = OpChainSpec,
Primitives = OpPrimitives,
Storage = OpStorage,
Engine = OpEngineTypes,
>,
>,
OpEngineValidator: EngineValidator<<N::Types as NodeTypesWithEngine>::Engine>,
{
type EthApi = OpEthApi<N>;
fn hooks_mut(&mut self) -> &mut reth_node_builder::rpc::RpcHooks<N, Self::EthApi> {
self.rpc_add_ons.hooks_mut()
}
}
impl<N> EngineValidatorAddOn<N> for OpAddOns<N>
where
N: FullNodeComponents<
Types: NodeTypesWithEngine<
ChainSpec = OpChainSpec,
Primitives = OpPrimitives,
Engine = OpEngineTypes,
>,
>,
{
type Validator = OpEngineValidator;
async fn engine_validator(&self, ctx: &AddOnsContext<'_, N>) -> eyre::Result<Self::Validator> {
OpEngineValidatorBuilder::default().build(ctx).await
}
}
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct OpAddOnsBuilder {
sequencer_client: Option<SequencerClient>,
da_config: Option<OpDAConfig>,
}
impl OpAddOnsBuilder {
pub fn with_sequencer(mut self, sequencer_client: Option<String>) -> Self {
self.sequencer_client = sequencer_client.map(SequencerClient::new);
self
}
pub fn with_da_config(mut self, da_config: OpDAConfig) -> Self {
self.da_config = Some(da_config);
self
}
}
impl OpAddOnsBuilder {
pub fn build<N>(self) -> OpAddOns<N>
where
N: FullNodeComponents<Types: NodeTypes<Primitives = OpPrimitives>>,
{
let Self { sequencer_client, da_config } = self;
OpAddOns {
rpc_add_ons: RpcAddOns::new(
move |ctx| OpEthApi::<N>::builder().with_sequencer(sequencer_client).build(ctx),
Default::default(),
),
da_config: da_config.unwrap_or_default(),
}
}
}
#[derive(Debug, Default, Clone, Copy)]
#[non_exhaustive]
pub struct OpExecutorBuilder;
impl<Node> ExecutorBuilder<Node> for OpExecutorBuilder
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec = OpChainSpec, Primitives = OpPrimitives>>,
{
type EVM = OpEvmConfig;
type Executor = BasicBlockExecutorProvider<OpExecutionStrategyFactory>;
async fn build_evm(
self,
ctx: &BuilderContext<Node>,
) -> eyre::Result<(Self::EVM, Self::Executor)> {
let evm_config = OpEvmConfig::new(ctx.chain_spec());
let strategy_factory =
OpExecutionStrategyFactory::new(ctx.chain_spec(), evm_config.clone());
let executor = BasicBlockExecutorProvider::new(strategy_factory);
Ok((evm_config, executor))
}
}
#[derive(Debug, Default, Clone)]
pub struct OpPoolBuilder {
pub pool_config_overrides: PoolBuilderConfigOverrides,
}
impl<Node> PoolBuilder<Node> for OpPoolBuilder
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec = OpChainSpec, Primitives = OpPrimitives>>,
{
type Pool = OpTransactionPool<Node::Provider, DiskFileBlobStore>;
async fn build_pool(self, ctx: &BuilderContext<Node>) -> eyre::Result<Self::Pool> {
let Self { pool_config_overrides } = self;
let data_dir = ctx.config().datadir();
let blob_store = DiskFileBlobStore::open(data_dir.blobstore(), Default::default())?;
let validator = TransactionValidationTaskExecutor::eth_builder(Arc::new(
ctx.chain_spec().inner.clone(),
))
.no_eip4844()
.with_head_timestamp(ctx.head().timestamp)
.kzg_settings(ctx.kzg_settings()?)
.with_additional_tasks(
pool_config_overrides
.additional_validation_tasks
.unwrap_or_else(|| ctx.config().txpool.additional_validation_tasks),
)
.build_with_tasks(ctx.provider().clone(), ctx.task_executor().clone(), blob_store.clone())
.map(|validator| {
OpTransactionValidator::new(validator)
.require_l1_data_gas_fee(!ctx.config().dev.dev)
});
let transaction_pool = reth_transaction_pool::Pool::new(
validator,
CoinbaseTipOrdering::default(),
blob_store,
pool_config_overrides.apply(ctx.pool_config()),
);
info!(target: "reth::cli", "Transaction pool initialized");
let transactions_path = data_dir.txpool_transactions();
{
let pool = transaction_pool.clone();
let chain_events = ctx.provider().canonical_state_stream();
let client = ctx.provider().clone();
let transactions_backup_config =
reth_transaction_pool::maintain::LocalTransactionBackupConfig::with_local_txs_backup(transactions_path);
ctx.task_executor().spawn_critical_with_graceful_shutdown_signal(
"local transactions backup task",
|shutdown| {
reth_transaction_pool::maintain::backup_local_transactions_task(
shutdown,
pool.clone(),
transactions_backup_config,
)
},
);
ctx.task_executor().spawn_critical(
"txpool maintenance task",
reth_transaction_pool::maintain::maintain_transaction_pool_future(
client,
pool,
chain_events,
ctx.task_executor().clone(),
Default::default(),
),
);
debug!(target: "reth::cli", "Spawned txpool maintenance task");
}
Ok(transaction_pool)
}
}
#[derive(Debug, Default, Clone)]
pub struct OpPayloadBuilder<Txs = ()> {
pub compute_pending_block: bool,
pub best_transactions: Txs,
pub da_config: OpDAConfig,
}
impl OpPayloadBuilder {
pub fn new(compute_pending_block: bool) -> Self {
Self { compute_pending_block, best_transactions: (), da_config: OpDAConfig::default() }
}
pub fn with_da_config(mut self, da_config: OpDAConfig) -> Self {
self.da_config = da_config;
self
}
}
impl<Txs> OpPayloadBuilder<Txs>
where
Txs: OpPayloadTransactions,
{
pub fn with_transactions<T: OpPayloadTransactions>(
self,
best_transactions: T,
) -> OpPayloadBuilder<T> {
let Self { compute_pending_block, da_config, .. } = self;
OpPayloadBuilder { compute_pending_block, best_transactions, da_config }
}
pub fn spawn<Node, Evm, Pool>(
self,
evm_config: Evm,
ctx: &BuilderContext<Node>,
pool: Pool,
) -> eyre::Result<PayloadBuilderHandle<OpEngineTypes>>
where
Node: FullNodeTypes<
Types: NodeTypesWithEngine<
Engine = OpEngineTypes,
ChainSpec = OpChainSpec,
Primitives = OpPrimitives,
>,
>,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
+ Unpin
+ 'static,
Evm: ConfigureEvm<Header = Header, Transaction = TransactionSigned>,
{
let payload_builder = reth_optimism_payload_builder::OpPayloadBuilder::with_builder_config(
evm_config,
OpBuilderConfig { da_config: self.da_config },
)
.with_transactions(self.best_transactions)
.set_compute_pending_block(self.compute_pending_block);
let conf = ctx.payload_builder_config();
let payload_job_config = BasicPayloadJobGeneratorConfig::default()
.interval(conf.interval())
.deadline(conf.deadline())
.max_payload_tasks(conf.max_payload_tasks());
let payload_generator = BasicPayloadJobGenerator::with_builder(
ctx.provider().clone(),
pool,
ctx.task_executor().clone(),
payload_job_config,
payload_builder,
);
let (payload_service, payload_builder) =
PayloadBuilderService::new(payload_generator, ctx.provider().canonical_state_stream());
ctx.task_executor().spawn_critical("payload builder service", Box::pin(payload_service));
Ok(payload_builder)
}
}
impl<Node, Pool, Txs> PayloadServiceBuilder<Node, Pool> for OpPayloadBuilder<Txs>
where
Node: FullNodeTypes<
Types: NodeTypesWithEngine<
Engine = OpEngineTypes,
ChainSpec = OpChainSpec,
Primitives = OpPrimitives,
>,
>,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
+ Unpin
+ 'static,
Txs: OpPayloadTransactions,
{
async fn spawn_payload_service(
self,
ctx: &BuilderContext<Node>,
pool: Pool,
) -> eyre::Result<PayloadBuilderHandle<OpEngineTypes>> {
self.spawn(OpEvmConfig::new(ctx.chain_spec()), ctx, pool)
}
}
#[derive(Debug, Default, Clone)]
pub struct OpNetworkBuilder {
pub disable_txpool_gossip: bool,
pub disable_discovery_v4: bool,
}
impl OpNetworkBuilder {
pub fn network_config<Node>(
&self,
ctx: &BuilderContext<Node>,
) -> eyre::Result<NetworkConfig<<Node as FullNodeTypes>::Provider, OpNetworkPrimitives>>
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec: Hardforks>>,
{
let Self { disable_txpool_gossip, disable_discovery_v4 } = self.clone();
let args = &ctx.config().network;
let network_builder = ctx
.network_config_builder()?
.apply(|mut builder| {
let rlpx_socket = (args.addr, args.port).into();
if disable_discovery_v4 || args.discovery.disable_discovery {
builder = builder.disable_discv4_discovery();
}
if !args.discovery.disable_discovery {
builder = builder.discovery_v5(
args.discovery.discovery_v5_builder(
rlpx_socket,
ctx.config()
.network
.resolved_bootnodes()
.or_else(|| ctx.chain_spec().bootnodes())
.unwrap_or_default(),
),
);
}
builder
});
let mut network_config = ctx.build_network_config(network_builder);
network_config.tx_gossip_disabled = disable_txpool_gossip;
Ok(network_config)
}
}
impl<Node, Pool> NetworkBuilder<Node, Pool> for OpNetworkBuilder
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec = OpChainSpec, Primitives = OpPrimitives>>,
Pool: TransactionPool<
Transaction: PoolTransaction<Consensus = TxTy<Node::Types>, Pooled = PooledTransaction>,
> + Unpin
+ 'static,
{
type Primitives = OpNetworkPrimitives;
async fn build_network(
self,
ctx: &BuilderContext<Node>,
pool: Pool,
) -> eyre::Result<NetworkHandle<Self::Primitives>> {
let network_config = self.network_config(ctx)?;
let network = NetworkManager::builder(network_config).await?;
let handle = ctx.start_network(network, pool);
info!(target: "reth::cli", enode=%handle.local_node_record(), "P2P networking initialized");
Ok(handle)
}
}
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct OpConsensusBuilder;
impl<Node> ConsensusBuilder<Node> for OpConsensusBuilder
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec = OpChainSpec, Primitives = OpPrimitives>>,
{
type Consensus = Arc<OpBeaconConsensus>;
async fn build_consensus(self, ctx: &BuilderContext<Node>) -> eyre::Result<Self::Consensus> {
Ok(Arc::new(OpBeaconConsensus::new(ctx.chain_spec())))
}
}
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct OpEngineValidatorBuilder;
impl<Node, Types> EngineValidatorBuilder<Node> for OpEngineValidatorBuilder
where
Types: NodeTypesWithEngine<
ChainSpec = OpChainSpec,
Primitives = OpPrimitives,
Engine = OpEngineTypes,
>,
Node: FullNodeComponents<Types = Types>,
{
type Validator = OpEngineValidator;
async fn build(self, ctx: &AddOnsContext<'_, Node>) -> eyre::Result<Self::Validator> {
Ok(OpEngineValidator::new(ctx.config.chain.clone()))
}
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub struct OpNetworkPrimitives;
impl NetworkPrimitives for OpNetworkPrimitives {
type BlockHeader = alloy_consensus::Header;
type BlockBody = reth_primitives::BlockBody;
type Block = reth_primitives::Block;
type BroadcastedTransaction = reth_primitives::TransactionSigned;
type PooledTransaction = reth_primitives::PooledTransaction;
type Receipt = reth_primitives::Receipt;
}