Skip to main content

reth_node_builder/components/
pool.rs

1//! Pool component for the node builder.
2
3use crate::{BuilderContext, FullNodeTypes};
4use alloy_primitives::map::AddressSet;
5use reth_chain_state::CanonStateSubscriptions;
6use reth_chainspec::EthereumHardforks;
7use reth_node_api::{BlockTy, NodeTypes, TxTy};
8use reth_transaction_pool::{
9    blobstore::DiskFileBlobStore, BlobStore, CoinbaseTipOrdering, PoolConfig, PoolTransaction,
10    SubPoolLimit, TransactionOrdering, TransactionPool, TransactionValidationTaskExecutor,
11    TransactionValidator,
12};
13use std::future::Future;
14
15/// A type that knows how to build the transaction pool.
16pub trait PoolBuilder<Node: FullNodeTypes, Evm>: Send {
17    /// The transaction pool to build.
18    type Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
19        + Unpin
20        + 'static;
21
22    /// Creates the transaction pool.
23    fn build_pool(
24        self,
25        ctx: &BuilderContext<Node>,
26        evm_config: Evm,
27    ) -> impl Future<Output = eyre::Result<Self::Pool>> + Send;
28}
29
30impl<Node, F, Fut, Pool, Evm> PoolBuilder<Node, Evm> for F
31where
32    Node: FullNodeTypes,
33    Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
34        + Unpin
35        + 'static,
36    F: FnOnce(&BuilderContext<Node>, Evm) -> Fut + Send,
37    Fut: Future<Output = eyre::Result<Pool>> + Send,
38{
39    type Pool = Pool;
40
41    fn build_pool(
42        self,
43        ctx: &BuilderContext<Node>,
44        evm_config: Evm,
45    ) -> impl Future<Output = eyre::Result<Self::Pool>> {
46        self(ctx, evm_config)
47    }
48}
49
50/// Convenience type to override cli or default pool configuration during build.
51#[derive(Debug, Clone, Default)]
52pub struct PoolBuilderConfigOverrides {
53    /// Max number of transaction in the pending sub-pool
54    pub pending_limit: Option<SubPoolLimit>,
55    /// Max number of transaction in the basefee sub-pool
56    pub basefee_limit: Option<SubPoolLimit>,
57    /// Max number of transaction in the queued sub-pool
58    pub queued_limit: Option<SubPoolLimit>,
59    /// Max number of transactions in the blob sub-pool
60    pub blob_limit: Option<SubPoolLimit>,
61    /// Max number of executable transaction slots guaranteed per account
62    pub max_account_slots: Option<usize>,
63    /// Minimum base fee required by the protocol.
64    pub minimal_protocol_basefee: Option<u64>,
65    /// Addresses that will be considered as local. Above exemptions apply.
66    pub local_addresses: AddressSet,
67    /// Additional tasks to validate new transactions.
68    pub additional_validation_tasks: Option<usize>,
69}
70
71impl PoolBuilderConfigOverrides {
72    /// Applies the configured overrides to the given [`PoolConfig`].
73    pub fn apply(self, mut config: PoolConfig) -> PoolConfig {
74        let Self {
75            pending_limit,
76            basefee_limit,
77            queued_limit,
78            blob_limit,
79            max_account_slots,
80            minimal_protocol_basefee,
81            local_addresses,
82            additional_validation_tasks: _,
83        } = self;
84
85        if let Some(pending_limit) = pending_limit {
86            config.pending_limit = pending_limit;
87        }
88        if let Some(basefee_limit) = basefee_limit {
89            config.basefee_limit = basefee_limit;
90        }
91        if let Some(queued_limit) = queued_limit {
92            config.queued_limit = queued_limit;
93        }
94        if let Some(blob_limit) = blob_limit {
95            config.blob_limit = blob_limit;
96        }
97        if let Some(max_account_slots) = max_account_slots {
98            config.max_account_slots = max_account_slots;
99        }
100        if let Some(minimal_protocol_basefee) = minimal_protocol_basefee {
101            config.minimal_protocol_basefee = minimal_protocol_basefee;
102        }
103        config.local_transactions_config.local_addresses.extend(local_addresses);
104
105        config
106    }
107}
108
109/// A builder for creating transaction pools with common configuration options.
110///
111/// This builder provides a fluent API for setting up transaction pools with various
112/// configurations like blob stores, validators, and maintenance tasks.
113pub struct TxPoolBuilder<'a, Node: FullNodeTypes, V = ()> {
114    ctx: &'a BuilderContext<Node>,
115    validator: V,
116}
117
118impl<'a, Node: FullNodeTypes> TxPoolBuilder<'a, Node> {
119    /// Creates a new `TxPoolBuilder` with the given context.
120    pub const fn new(ctx: &'a BuilderContext<Node>) -> Self {
121        Self { ctx, validator: () }
122    }
123}
124
125impl<'a, Node: FullNodeTypes, V> TxPoolBuilder<'a, Node, V> {
126    /// Configure the validator for the transaction pool.
127    pub fn with_validator<NewV>(self, validator: NewV) -> TxPoolBuilder<'a, Node, NewV> {
128        TxPoolBuilder { ctx: self.ctx, validator }
129    }
130}
131
132impl<'a, Node, V> TxPoolBuilder<'a, Node, TransactionValidationTaskExecutor<V>>
133where
134    Node: FullNodeTypes<Types: NodeTypes<ChainSpec: EthereumHardforks>>,
135    V: TransactionValidator<Block = BlockTy<Node::Types>> + 'static,
136    V::Transaction:
137        PoolTransaction<Consensus = TxTy<Node::Types>> + reth_transaction_pool::EthPoolTransaction,
138{
139    /// Consume the ype and build the [`reth_transaction_pool::Pool`] with the given config and blob
140    /// store.
141    pub fn build<BS>(
142        self,
143        blob_store: BS,
144        pool_config: PoolConfig,
145    ) -> reth_transaction_pool::Pool<
146        TransactionValidationTaskExecutor<V>,
147        CoinbaseTipOrdering<V::Transaction>,
148        BS,
149    >
150    where
151        BS: BlobStore,
152    {
153        let TxPoolBuilder { validator, .. } = self;
154        reth_transaction_pool::Pool::new(
155            validator,
156            CoinbaseTipOrdering::default(),
157            blob_store,
158            pool_config,
159        )
160    }
161
162    /// Build the transaction pool and spawn its maintenance tasks.
163    /// This method creates the blob store, builds the pool, and spawns maintenance tasks.
164    pub fn build_and_spawn_maintenance_task<BS>(
165        self,
166        blob_store: BS,
167        pool_config: PoolConfig,
168    ) -> eyre::Result<
169        reth_transaction_pool::Pool<
170            TransactionValidationTaskExecutor<V>,
171            CoinbaseTipOrdering<V::Transaction>,
172            BS,
173        >,
174    >
175    where
176        BS: BlobStore,
177    {
178        self.build_with_ordering_and_spawn_maintenance_task(
179            CoinbaseTipOrdering::default(),
180            blob_store,
181            pool_config,
182        )
183    }
184
185    /// Build the transaction pool with a custom [`TransactionOrdering`] and spawn its maintenance
186    /// tasks.
187    pub fn build_with_ordering_and_spawn_maintenance_task<BS, O>(
188        self,
189        ordering: O,
190        blob_store: BS,
191        pool_config: PoolConfig,
192    ) -> eyre::Result<reth_transaction_pool::Pool<TransactionValidationTaskExecutor<V>, O, BS>>
193    where
194        BS: BlobStore,
195        O: TransactionOrdering<Transaction = V::Transaction>,
196    {
197        let TxPoolBuilder { ctx, validator, .. } = self;
198
199        let transaction_pool =
200            reth_transaction_pool::Pool::new(validator, ordering, blob_store, pool_config.clone());
201
202        spawn_maintenance_tasks(ctx, transaction_pool.clone(), &pool_config)?;
203
204        Ok(transaction_pool)
205    }
206}
207
208/// Create blob store with default configuration.
209pub fn create_blob_store<Node: FullNodeTypes>(
210    ctx: &BuilderContext<Node>,
211) -> eyre::Result<DiskFileBlobStore> {
212    let cache_size = Some(ctx.config().txpool.max_cached_entries);
213    create_blob_store_with_cache(ctx, cache_size)
214}
215
216/// Create blob store with custom cache size configuration for how many blobs should be cached in
217/// memory.
218pub fn create_blob_store_with_cache<Node: FullNodeTypes>(
219    ctx: &BuilderContext<Node>,
220    cache_size: Option<u32>,
221) -> eyre::Result<DiskFileBlobStore> {
222    let data_dir = ctx.config().datadir();
223    let config = if let Some(cache_size) = cache_size {
224        reth_transaction_pool::blobstore::DiskFileBlobStoreConfig::default()
225            .with_max_cached_entries(cache_size)
226    } else {
227        Default::default()
228    };
229
230    Ok(reth_transaction_pool::blobstore::DiskFileBlobStore::open(data_dir.blobstore(), config)?)
231}
232
233/// Spawn local transaction backup task if enabled.
234fn spawn_local_backup_task<Node, Pool>(ctx: &BuilderContext<Node>, pool: Pool) -> eyre::Result<()>
235where
236    Node: FullNodeTypes,
237    Pool: TransactionPool + Clone + 'static,
238{
239    if !ctx.config().txpool.disable_transactions_backup {
240        let data_dir = ctx.config().datadir();
241        let transactions_path = ctx
242            .config()
243            .txpool
244            .transactions_backup_path
245            .clone()
246            .unwrap_or_else(|| data_dir.txpool_transactions());
247
248        let transactions_backup_config =
249            reth_transaction_pool::maintain::LocalTransactionBackupConfig::with_local_txs_backup(
250                transactions_path,
251            );
252
253        ctx.task_executor().spawn_critical_with_graceful_shutdown_signal(
254            "local transactions backup task",
255            |shutdown| {
256                reth_transaction_pool::maintain::backup_local_transactions_task(
257                    shutdown,
258                    pool,
259                    transactions_backup_config,
260                )
261            },
262        );
263    }
264    Ok(())
265}
266
267/// Spawn the main maintenance task for transaction pool.
268fn spawn_pool_maintenance_task<Node, Pool>(
269    ctx: &BuilderContext<Node>,
270    pool: Pool,
271    pool_config: &PoolConfig,
272) -> eyre::Result<()>
273where
274    Node: FullNodeTypes<Types: NodeTypes<ChainSpec: EthereumHardforks>>,
275    Pool: reth_transaction_pool::TransactionPoolExt<Block = BlockTy<Node::Types>> + Clone + 'static,
276    Pool::Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>,
277{
278    let chain_events = ctx.provider().canonical_state_stream();
279    let client = ctx.provider().clone();
280
281    ctx.task_executor().spawn_critical_task(
282        "txpool maintenance task",
283        reth_transaction_pool::maintain::maintain_transaction_pool_future(
284            client,
285            pool,
286            chain_events,
287            ctx.task_executor().clone(),
288            reth_transaction_pool::maintain::MaintainPoolConfig {
289                max_tx_lifetime: pool_config.max_queued_lifetime,
290                no_local_exemptions: pool_config.local_transactions_config.no_exemptions,
291                ..Default::default()
292            },
293        ),
294    );
295
296    Ok(())
297}
298
299/// Spawn all maintenance tasks for a transaction pool (backup + main maintenance).
300pub fn spawn_maintenance_tasks<Node, Pool>(
301    ctx: &BuilderContext<Node>,
302    pool: Pool,
303    pool_config: &PoolConfig,
304) -> eyre::Result<()>
305where
306    Node: FullNodeTypes<Types: NodeTypes<ChainSpec: EthereumHardforks>>,
307    Pool: reth_transaction_pool::TransactionPoolExt<Block = BlockTy<Node::Types>> + Clone + 'static,
308    Pool::Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>,
309{
310    spawn_local_backup_task(ctx, pool.clone())?;
311    spawn_pool_maintenance_task(ctx, pool, pool_config)?;
312    Ok(())
313}
314
315impl<Node: FullNodeTypes, V: std::fmt::Debug> std::fmt::Debug for TxPoolBuilder<'_, Node, V> {
316    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
317        f.debug_struct("TxPoolBuilder").field("validator", &self.validator).finish()
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324    use reth_transaction_pool::PoolConfig;
325
326    #[test]
327    fn test_pool_builder_config_overrides_apply() {
328        let base_config = PoolConfig::default();
329        let overrides = PoolBuilderConfigOverrides {
330            pending_limit: Some(SubPoolLimit::default()),
331            max_account_slots: Some(100),
332            minimal_protocol_basefee: Some(1000),
333            ..Default::default()
334        };
335
336        let updated_config = overrides.apply(base_config);
337        assert_eq!(updated_config.max_account_slots, 100);
338        assert_eq!(updated_config.minimal_protocol_basefee, 1000);
339    }
340
341    #[test]
342    fn test_pool_builder_config_overrides_default() {
343        let overrides = PoolBuilderConfigOverrides::default();
344        assert!(overrides.pending_limit.is_none());
345        assert!(overrides.max_account_slots.is_none());
346        assert!(overrides.local_addresses.is_empty());
347    }
348}