reth_node_builder/components/
pool.rs

1//! Pool component for the node builder.
2
3use crate::{BuilderContext, FullNodeTypes};
4use alloy_primitives::Address;
5use reth_chain_state::CanonStateSubscriptions;
6use reth_chainspec::EthereumHardforks;
7use reth_node_api::{NodeTypes, TxTy};
8use reth_transaction_pool::{
9    blobstore::DiskFileBlobStore, BlobStore, CoinbaseTipOrdering, PoolConfig, PoolTransaction,
10    SubPoolLimit, TransactionPool, TransactionValidationTaskExecutor, TransactionValidator,
11};
12use std::{collections::HashSet, future::Future};
13
14/// A type that knows how to build the transaction pool.
15pub trait PoolBuilder<Node: FullNodeTypes>: Send {
16    /// The transaction pool to build.
17    type Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
18        + Unpin
19        + 'static;
20
21    /// Creates the transaction pool.
22    fn build_pool(
23        self,
24        ctx: &BuilderContext<Node>,
25    ) -> impl Future<Output = eyre::Result<Self::Pool>> + Send;
26}
27
28impl<Node, F, Fut, Pool> PoolBuilder<Node> for F
29where
30    Node: FullNodeTypes,
31    Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
32        + Unpin
33        + 'static,
34    F: FnOnce(&BuilderContext<Node>) -> Fut + Send,
35    Fut: Future<Output = eyre::Result<Pool>> + Send,
36{
37    type Pool = Pool;
38
39    fn build_pool(
40        self,
41        ctx: &BuilderContext<Node>,
42    ) -> impl Future<Output = eyre::Result<Self::Pool>> {
43        self(ctx)
44    }
45}
46
47/// Convenience type to override cli or default pool configuration during build.
48#[derive(Debug, Clone, Default)]
49pub struct PoolBuilderConfigOverrides {
50    /// Max number of transaction in the pending sub-pool
51    pub pending_limit: Option<SubPoolLimit>,
52    /// Max number of transaction in the basefee sub-pool
53    pub basefee_limit: Option<SubPoolLimit>,
54    /// Max number of transaction in the queued sub-pool
55    pub queued_limit: Option<SubPoolLimit>,
56    /// Max number of transactions in the blob sub-pool
57    pub blob_limit: Option<SubPoolLimit>,
58    /// Max number of executable transaction slots guaranteed per account
59    pub max_account_slots: Option<usize>,
60    /// Minimum base fee required by the protocol.
61    pub minimal_protocol_basefee: Option<u64>,
62    /// Addresses that will be considered as local. Above exemptions apply.
63    pub local_addresses: HashSet<Address>,
64    /// Additional tasks to validate new transactions.
65    pub additional_validation_tasks: Option<usize>,
66}
67
68impl PoolBuilderConfigOverrides {
69    /// Applies the configured overrides to the given [`PoolConfig`].
70    pub fn apply(self, mut config: PoolConfig) -> PoolConfig {
71        let Self {
72            pending_limit,
73            basefee_limit,
74            queued_limit,
75            blob_limit,
76            max_account_slots,
77            minimal_protocol_basefee,
78            local_addresses,
79            additional_validation_tasks: _,
80        } = self;
81
82        if let Some(pending_limit) = pending_limit {
83            config.pending_limit = pending_limit;
84        }
85        if let Some(basefee_limit) = basefee_limit {
86            config.basefee_limit = basefee_limit;
87        }
88        if let Some(queued_limit) = queued_limit {
89            config.queued_limit = queued_limit;
90        }
91        if let Some(blob_limit) = blob_limit {
92            config.blob_limit = blob_limit;
93        }
94        if let Some(max_account_slots) = max_account_slots {
95            config.max_account_slots = max_account_slots;
96        }
97        if let Some(minimal_protocol_basefee) = minimal_protocol_basefee {
98            config.minimal_protocol_basefee = minimal_protocol_basefee;
99        }
100        config.local_transactions_config.local_addresses.extend(local_addresses);
101
102        config
103    }
104}
105
106/// A builder for creating transaction pools with common configuration options.
107///
108/// This builder provides a fluent API for setting up transaction pools with various
109/// configurations like blob stores, validators, and maintenance tasks.
110pub struct TxPoolBuilder<'a, Node: FullNodeTypes, V = ()> {
111    ctx: &'a BuilderContext<Node>,
112    validator: V,
113}
114
115impl<'a, Node: FullNodeTypes> TxPoolBuilder<'a, Node> {
116    /// Creates a new `TxPoolBuilder` with the given context.
117    pub const fn new(ctx: &'a BuilderContext<Node>) -> Self {
118        Self { ctx, validator: () }
119    }
120}
121
122impl<'a, Node: FullNodeTypes, V> TxPoolBuilder<'a, Node, V> {
123    /// Configure the validator for the transaction pool.
124    pub fn with_validator<NewV>(self, validator: NewV) -> TxPoolBuilder<'a, Node, NewV> {
125        TxPoolBuilder { ctx: self.ctx, validator }
126    }
127}
128
129impl<'a, Node, V> TxPoolBuilder<'a, Node, TransactionValidationTaskExecutor<V>>
130where
131    Node: FullNodeTypes<Types: NodeTypes<ChainSpec: EthereumHardforks>>,
132    V: TransactionValidator + 'static,
133    V::Transaction:
134        PoolTransaction<Consensus = TxTy<Node::Types>> + reth_transaction_pool::EthPoolTransaction,
135{
136    /// Consume the ype and build the [`reth_transaction_pool::Pool`] with the given config and blob
137    /// store.
138    pub fn build<BS>(
139        self,
140        blob_store: BS,
141        pool_config: PoolConfig,
142    ) -> reth_transaction_pool::Pool<
143        TransactionValidationTaskExecutor<V>,
144        CoinbaseTipOrdering<V::Transaction>,
145        BS,
146    >
147    where
148        BS: BlobStore,
149    {
150        let TxPoolBuilder { validator, .. } = self;
151        reth_transaction_pool::Pool::new(
152            validator,
153            CoinbaseTipOrdering::default(),
154            blob_store,
155            pool_config,
156        )
157    }
158
159    /// Build the transaction pool and spawn its maintenance tasks.
160    /// This method creates the blob store, builds the pool, and spawns maintenance tasks.
161    pub fn build_and_spawn_maintenance_task<BS>(
162        self,
163        blob_store: BS,
164        pool_config: PoolConfig,
165    ) -> eyre::Result<
166        reth_transaction_pool::Pool<
167            TransactionValidationTaskExecutor<V>,
168            CoinbaseTipOrdering<V::Transaction>,
169            BS,
170        >,
171    >
172    where
173        BS: BlobStore,
174    {
175        let ctx = self.ctx;
176        let transaction_pool = self.build(blob_store, pool_config);
177        // Spawn maintenance tasks using standalone functions
178        spawn_maintenance_tasks(ctx, transaction_pool.clone(), transaction_pool.config())?;
179
180        Ok(transaction_pool)
181    }
182}
183
184/// Create blob store with default configuration.
185pub fn create_blob_store<Node: FullNodeTypes>(
186    ctx: &BuilderContext<Node>,
187) -> eyre::Result<DiskFileBlobStore> {
188    let cache_size = Some(ctx.config().txpool.max_cached_entries);
189    create_blob_store_with_cache(ctx, cache_size)
190}
191
192/// Create blob store with custom cache size configuration for how many blobs should be cached in
193/// memory.
194pub fn create_blob_store_with_cache<Node: FullNodeTypes>(
195    ctx: &BuilderContext<Node>,
196    cache_size: Option<u32>,
197) -> eyre::Result<DiskFileBlobStore> {
198    let data_dir = ctx.config().datadir();
199    let config = if let Some(cache_size) = cache_size {
200        reth_transaction_pool::blobstore::DiskFileBlobStoreConfig::default()
201            .with_max_cached_entries(cache_size)
202    } else {
203        Default::default()
204    };
205
206    Ok(reth_transaction_pool::blobstore::DiskFileBlobStore::open(data_dir.blobstore(), config)?)
207}
208
209/// Spawn local transaction backup task if enabled.
210fn spawn_local_backup_task<Node, Pool>(ctx: &BuilderContext<Node>, pool: Pool) -> eyre::Result<()>
211where
212    Node: FullNodeTypes,
213    Pool: TransactionPool + Clone + 'static,
214{
215    if !ctx.config().txpool.disable_transactions_backup {
216        let data_dir = ctx.config().datadir();
217        let transactions_path = ctx
218            .config()
219            .txpool
220            .transactions_backup_path
221            .clone()
222            .unwrap_or_else(|| data_dir.txpool_transactions());
223
224        let transactions_backup_config =
225            reth_transaction_pool::maintain::LocalTransactionBackupConfig::with_local_txs_backup(
226                transactions_path,
227            );
228
229        ctx.task_executor().spawn_critical_with_graceful_shutdown_signal(
230            "local transactions backup task",
231            |shutdown| {
232                reth_transaction_pool::maintain::backup_local_transactions_task(
233                    shutdown,
234                    pool,
235                    transactions_backup_config,
236                )
237            },
238        );
239    }
240    Ok(())
241}
242
243/// Spawn the main maintenance task for transaction pool.
244fn spawn_pool_maintenance_task<Node, Pool>(
245    ctx: &BuilderContext<Node>,
246    pool: Pool,
247    pool_config: &PoolConfig,
248) -> eyre::Result<()>
249where
250    Node: FullNodeTypes<Types: NodeTypes<ChainSpec: EthereumHardforks>>,
251    Pool: reth_transaction_pool::TransactionPoolExt + Clone + 'static,
252    Pool::Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>,
253{
254    let chain_events = ctx.provider().canonical_state_stream();
255    let client = ctx.provider().clone();
256
257    ctx.task_executor().spawn_critical(
258        "txpool maintenance task",
259        reth_transaction_pool::maintain::maintain_transaction_pool_future(
260            client,
261            pool,
262            chain_events,
263            ctx.task_executor().clone(),
264            reth_transaction_pool::maintain::MaintainPoolConfig {
265                max_tx_lifetime: pool_config.max_queued_lifetime,
266                no_local_exemptions: pool_config.local_transactions_config.no_exemptions,
267                ..Default::default()
268            },
269        ),
270    );
271
272    Ok(())
273}
274
275/// Spawn all maintenance tasks for a transaction pool (backup + main maintenance).
276pub fn spawn_maintenance_tasks<Node, Pool>(
277    ctx: &BuilderContext<Node>,
278    pool: Pool,
279    pool_config: &PoolConfig,
280) -> eyre::Result<()>
281where
282    Node: FullNodeTypes<Types: NodeTypes<ChainSpec: EthereumHardforks>>,
283    Pool: reth_transaction_pool::TransactionPoolExt + Clone + 'static,
284    Pool::Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>,
285{
286    spawn_local_backup_task(ctx, pool.clone())?;
287    spawn_pool_maintenance_task(ctx, pool, pool_config)?;
288    Ok(())
289}
290
291impl<Node: FullNodeTypes, V: std::fmt::Debug> std::fmt::Debug for TxPoolBuilder<'_, Node, V> {
292    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293        f.debug_struct("TxPoolBuilder").field("validator", &self.validator).finish()
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use reth_transaction_pool::PoolConfig;
301
302    #[test]
303    fn test_pool_builder_config_overrides_apply() {
304        let base_config = PoolConfig::default();
305        let overrides = PoolBuilderConfigOverrides {
306            pending_limit: Some(SubPoolLimit::default()),
307            max_account_slots: Some(100),
308            minimal_protocol_basefee: Some(1000),
309            ..Default::default()
310        };
311
312        let updated_config = overrides.apply(base_config);
313        assert_eq!(updated_config.max_account_slots, 100);
314        assert_eq!(updated_config.minimal_protocol_basefee, 1000);
315    }
316
317    #[test]
318    fn test_pool_builder_config_overrides_default() {
319        let overrides = PoolBuilderConfigOverrides::default();
320        assert!(overrides.pending_limit.is_none());
321        assert!(overrides.max_account_slots.is_none());
322        assert!(overrides.local_addresses.is_empty());
323    }
324}