reth_node_builder/components/
pool.rs1use crate::{BuilderContext, FullNodeTypes};
4use alloy_primitives::map::AddressSet;
5use reth_chain_state::CanonStateSubscriptions;
6use reth_chainspec::EthereumHardforks;
7use reth_node_api::{BlockTy, NodeTypes, TxTy};
8use reth_transaction_pool::{
9 blobstore::DiskFileBlobStore, BlobStore, CoinbaseTipOrdering, PoolConfig, PoolTransaction,
10 SubPoolLimit, TransactionOrdering, TransactionPool, TransactionValidationTaskExecutor,
11 TransactionValidator,
12};
13use std::future::Future;
14
15pub trait PoolBuilder<Node: FullNodeTypes, Evm>: Send {
17 type Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
19 + Unpin
20 + 'static;
21
22 fn build_pool(
24 self,
25 ctx: &BuilderContext<Node>,
26 evm_config: Evm,
27 ) -> impl Future<Output = eyre::Result<Self::Pool>> + Send;
28}
29
30impl<Node, F, Fut, Pool, Evm> PoolBuilder<Node, Evm> for F
31where
32 Node: FullNodeTypes,
33 Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
34 + Unpin
35 + 'static,
36 F: FnOnce(&BuilderContext<Node>, Evm) -> Fut + Send,
37 Fut: Future<Output = eyre::Result<Pool>> + Send,
38{
39 type Pool = Pool;
40
41 fn build_pool(
42 self,
43 ctx: &BuilderContext<Node>,
44 evm_config: Evm,
45 ) -> impl Future<Output = eyre::Result<Self::Pool>> {
46 self(ctx, evm_config)
47 }
48}
49
50#[derive(Debug, Clone, Default)]
52pub struct PoolBuilderConfigOverrides {
53 pub pending_limit: Option<SubPoolLimit>,
55 pub basefee_limit: Option<SubPoolLimit>,
57 pub queued_limit: Option<SubPoolLimit>,
59 pub blob_limit: Option<SubPoolLimit>,
61 pub max_account_slots: Option<usize>,
63 pub minimal_protocol_basefee: Option<u64>,
65 pub local_addresses: AddressSet,
67 pub additional_validation_tasks: Option<usize>,
69}
70
71impl PoolBuilderConfigOverrides {
72 pub fn apply(self, mut config: PoolConfig) -> PoolConfig {
74 let Self {
75 pending_limit,
76 basefee_limit,
77 queued_limit,
78 blob_limit,
79 max_account_slots,
80 minimal_protocol_basefee,
81 local_addresses,
82 additional_validation_tasks: _,
83 } = self;
84
85 if let Some(pending_limit) = pending_limit {
86 config.pending_limit = pending_limit;
87 }
88 if let Some(basefee_limit) = basefee_limit {
89 config.basefee_limit = basefee_limit;
90 }
91 if let Some(queued_limit) = queued_limit {
92 config.queued_limit = queued_limit;
93 }
94 if let Some(blob_limit) = blob_limit {
95 config.blob_limit = blob_limit;
96 }
97 if let Some(max_account_slots) = max_account_slots {
98 config.max_account_slots = max_account_slots;
99 }
100 if let Some(minimal_protocol_basefee) = minimal_protocol_basefee {
101 config.minimal_protocol_basefee = minimal_protocol_basefee;
102 }
103 config.local_transactions_config.local_addresses.extend(local_addresses);
104
105 config
106 }
107}
108
109pub struct TxPoolBuilder<'a, Node: FullNodeTypes, V = ()> {
114 ctx: &'a BuilderContext<Node>,
115 validator: V,
116}
117
118impl<'a, Node: FullNodeTypes> TxPoolBuilder<'a, Node> {
119 pub const fn new(ctx: &'a BuilderContext<Node>) -> Self {
121 Self { ctx, validator: () }
122 }
123}
124
125impl<'a, Node: FullNodeTypes, V> TxPoolBuilder<'a, Node, V> {
126 pub fn with_validator<NewV>(self, validator: NewV) -> TxPoolBuilder<'a, Node, NewV> {
128 TxPoolBuilder { ctx: self.ctx, validator }
129 }
130}
131
132impl<'a, Node, V> TxPoolBuilder<'a, Node, TransactionValidationTaskExecutor<V>>
133where
134 Node: FullNodeTypes<Types: NodeTypes<ChainSpec: EthereumHardforks>>,
135 V: TransactionValidator<Block = BlockTy<Node::Types>> + 'static,
136 V::Transaction:
137 PoolTransaction<Consensus = TxTy<Node::Types>> + reth_transaction_pool::EthPoolTransaction,
138{
139 pub fn build<BS>(
142 self,
143 blob_store: BS,
144 pool_config: PoolConfig,
145 ) -> reth_transaction_pool::Pool<
146 TransactionValidationTaskExecutor<V>,
147 CoinbaseTipOrdering<V::Transaction>,
148 BS,
149 >
150 where
151 BS: BlobStore,
152 {
153 let TxPoolBuilder { validator, .. } = self;
154 reth_transaction_pool::Pool::new(
155 validator,
156 CoinbaseTipOrdering::default(),
157 blob_store,
158 pool_config,
159 )
160 }
161
162 pub fn build_and_spawn_maintenance_task<BS>(
165 self,
166 blob_store: BS,
167 pool_config: PoolConfig,
168 ) -> eyre::Result<
169 reth_transaction_pool::Pool<
170 TransactionValidationTaskExecutor<V>,
171 CoinbaseTipOrdering<V::Transaction>,
172 BS,
173 >,
174 >
175 where
176 BS: BlobStore,
177 {
178 self.build_with_ordering_and_spawn_maintenance_task(
179 CoinbaseTipOrdering::default(),
180 blob_store,
181 pool_config,
182 )
183 }
184
185 pub fn build_with_ordering_and_spawn_maintenance_task<BS, O>(
188 self,
189 ordering: O,
190 blob_store: BS,
191 pool_config: PoolConfig,
192 ) -> eyre::Result<reth_transaction_pool::Pool<TransactionValidationTaskExecutor<V>, O, BS>>
193 where
194 BS: BlobStore,
195 O: TransactionOrdering<Transaction = V::Transaction>,
196 {
197 let TxPoolBuilder { ctx, validator, .. } = self;
198
199 let transaction_pool =
200 reth_transaction_pool::Pool::new(validator, ordering, blob_store, pool_config.clone());
201
202 spawn_maintenance_tasks(ctx, transaction_pool.clone(), &pool_config)?;
203
204 Ok(transaction_pool)
205 }
206}
207
208pub fn create_blob_store<Node: FullNodeTypes>(
210 ctx: &BuilderContext<Node>,
211) -> eyre::Result<DiskFileBlobStore> {
212 let cache_size = Some(ctx.config().txpool.max_cached_entries);
213 create_blob_store_with_cache(ctx, cache_size)
214}
215
216pub fn create_blob_store_with_cache<Node: FullNodeTypes>(
219 ctx: &BuilderContext<Node>,
220 cache_size: Option<u32>,
221) -> eyre::Result<DiskFileBlobStore> {
222 let data_dir = ctx.config().datadir();
223 let config = if let Some(cache_size) = cache_size {
224 reth_transaction_pool::blobstore::DiskFileBlobStoreConfig::default()
225 .with_max_cached_entries(cache_size)
226 } else {
227 Default::default()
228 };
229
230 Ok(reth_transaction_pool::blobstore::DiskFileBlobStore::open(data_dir.blobstore(), config)?)
231}
232
233fn spawn_local_backup_task<Node, Pool>(ctx: &BuilderContext<Node>, pool: Pool) -> eyre::Result<()>
235where
236 Node: FullNodeTypes,
237 Pool: TransactionPool + Clone + 'static,
238{
239 if !ctx.config().txpool.disable_transactions_backup {
240 let data_dir = ctx.config().datadir();
241 let transactions_path = ctx
242 .config()
243 .txpool
244 .transactions_backup_path
245 .clone()
246 .unwrap_or_else(|| data_dir.txpool_transactions());
247
248 let transactions_backup_config =
249 reth_transaction_pool::maintain::LocalTransactionBackupConfig::with_local_txs_backup(
250 transactions_path,
251 );
252
253 ctx.task_executor().spawn_critical_with_graceful_shutdown_signal(
254 "local transactions backup task",
255 |shutdown| {
256 reth_transaction_pool::maintain::backup_local_transactions_task(
257 shutdown,
258 pool,
259 transactions_backup_config,
260 )
261 },
262 );
263 }
264 Ok(())
265}
266
267fn spawn_pool_maintenance_task<Node, Pool>(
269 ctx: &BuilderContext<Node>,
270 pool: Pool,
271 pool_config: &PoolConfig,
272) -> eyre::Result<()>
273where
274 Node: FullNodeTypes<Types: NodeTypes<ChainSpec: EthereumHardforks>>,
275 Pool: reth_transaction_pool::TransactionPoolExt<Block = BlockTy<Node::Types>> + Clone + 'static,
276 Pool::Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>,
277{
278 let chain_events = ctx.provider().canonical_state_stream();
279 let client = ctx.provider().clone();
280
281 ctx.task_executor().spawn_critical_task(
282 "txpool maintenance task",
283 reth_transaction_pool::maintain::maintain_transaction_pool_future(
284 client,
285 pool,
286 chain_events,
287 ctx.task_executor().clone(),
288 reth_transaction_pool::maintain::MaintainPoolConfig {
289 max_tx_lifetime: pool_config.max_queued_lifetime,
290 no_local_exemptions: pool_config.local_transactions_config.no_exemptions,
291 ..Default::default()
292 },
293 ),
294 );
295
296 Ok(())
297}
298
299pub fn spawn_maintenance_tasks<Node, Pool>(
301 ctx: &BuilderContext<Node>,
302 pool: Pool,
303 pool_config: &PoolConfig,
304) -> eyre::Result<()>
305where
306 Node: FullNodeTypes<Types: NodeTypes<ChainSpec: EthereumHardforks>>,
307 Pool: reth_transaction_pool::TransactionPoolExt<Block = BlockTy<Node::Types>> + Clone + 'static,
308 Pool::Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>,
309{
310 spawn_local_backup_task(ctx, pool.clone())?;
311 spawn_pool_maintenance_task(ctx, pool, pool_config)?;
312 Ok(())
313}
314
315impl<Node: FullNodeTypes, V: std::fmt::Debug> std::fmt::Debug for TxPoolBuilder<'_, Node, V> {
316 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
317 f.debug_struct("TxPoolBuilder").field("validator", &self.validator).finish()
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324 use reth_transaction_pool::PoolConfig;
325
326 #[test]
327 fn test_pool_builder_config_overrides_apply() {
328 let base_config = PoolConfig::default();
329 let overrides = PoolBuilderConfigOverrides {
330 pending_limit: Some(SubPoolLimit::default()),
331 max_account_slots: Some(100),
332 minimal_protocol_basefee: Some(1000),
333 ..Default::default()
334 };
335
336 let updated_config = overrides.apply(base_config);
337 assert_eq!(updated_config.max_account_slots, 100);
338 assert_eq!(updated_config.minimal_protocol_basefee, 1000);
339 }
340
341 #[test]
342 fn test_pool_builder_config_overrides_default() {
343 let overrides = PoolBuilderConfigOverrides::default();
344 assert!(overrides.pending_limit.is_none());
345 assert!(overrides.max_account_slots.is_none());
346 assert!(overrides.local_addresses.is_empty());
347 }
348}