reth_node_builder/components/
pool.rs1use crate::{BuilderContext, FullNodeTypes};
4use alloy_primitives::Address;
5use reth_chain_state::CanonStateSubscriptions;
6use reth_chainspec::EthereumHardforks;
7use reth_node_api::{NodeTypes, TxTy};
8use reth_transaction_pool::{
9 blobstore::DiskFileBlobStore, BlobStore, CoinbaseTipOrdering, PoolConfig, PoolTransaction,
10 SubPoolLimit, TransactionPool, TransactionValidationTaskExecutor, TransactionValidator,
11};
12use std::{collections::HashSet, future::Future};
13
14pub trait PoolBuilder<Node: FullNodeTypes>: Send {
16 type Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
18 + Unpin
19 + 'static;
20
21 fn build_pool(
23 self,
24 ctx: &BuilderContext<Node>,
25 ) -> impl Future<Output = eyre::Result<Self::Pool>> + Send;
26}
27
28impl<Node, F, Fut, Pool> PoolBuilder<Node> for F
29where
30 Node: FullNodeTypes,
31 Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
32 + Unpin
33 + 'static,
34 F: FnOnce(&BuilderContext<Node>) -> Fut + Send,
35 Fut: Future<Output = eyre::Result<Pool>> + Send,
36{
37 type Pool = Pool;
38
39 fn build_pool(
40 self,
41 ctx: &BuilderContext<Node>,
42 ) -> impl Future<Output = eyre::Result<Self::Pool>> {
43 self(ctx)
44 }
45}
46
47#[derive(Debug, Clone, Default)]
49pub struct PoolBuilderConfigOverrides {
50 pub pending_limit: Option<SubPoolLimit>,
52 pub basefee_limit: Option<SubPoolLimit>,
54 pub queued_limit: Option<SubPoolLimit>,
56 pub blob_limit: Option<SubPoolLimit>,
58 pub max_account_slots: Option<usize>,
60 pub minimal_protocol_basefee: Option<u64>,
62 pub local_addresses: HashSet<Address>,
64 pub additional_validation_tasks: Option<usize>,
66}
67
68impl PoolBuilderConfigOverrides {
69 pub fn apply(self, mut config: PoolConfig) -> PoolConfig {
71 let Self {
72 pending_limit,
73 basefee_limit,
74 queued_limit,
75 blob_limit,
76 max_account_slots,
77 minimal_protocol_basefee,
78 local_addresses,
79 additional_validation_tasks: _,
80 } = self;
81
82 if let Some(pending_limit) = pending_limit {
83 config.pending_limit = pending_limit;
84 }
85 if let Some(basefee_limit) = basefee_limit {
86 config.basefee_limit = basefee_limit;
87 }
88 if let Some(queued_limit) = queued_limit {
89 config.queued_limit = queued_limit;
90 }
91 if let Some(blob_limit) = blob_limit {
92 config.blob_limit = blob_limit;
93 }
94 if let Some(max_account_slots) = max_account_slots {
95 config.max_account_slots = max_account_slots;
96 }
97 if let Some(minimal_protocol_basefee) = minimal_protocol_basefee {
98 config.minimal_protocol_basefee = minimal_protocol_basefee;
99 }
100 config.local_transactions_config.local_addresses.extend(local_addresses);
101
102 config
103 }
104}
105
106pub struct TxPoolBuilder<'a, Node: FullNodeTypes, V = ()> {
111 ctx: &'a BuilderContext<Node>,
112 validator: V,
113}
114
115impl<'a, Node: FullNodeTypes> TxPoolBuilder<'a, Node> {
116 pub const fn new(ctx: &'a BuilderContext<Node>) -> Self {
118 Self { ctx, validator: () }
119 }
120}
121
122impl<'a, Node: FullNodeTypes, V> TxPoolBuilder<'a, Node, V> {
123 pub fn with_validator<NewV>(self, validator: NewV) -> TxPoolBuilder<'a, Node, NewV> {
125 TxPoolBuilder { ctx: self.ctx, validator }
126 }
127}
128
129impl<'a, Node, V> TxPoolBuilder<'a, Node, TransactionValidationTaskExecutor<V>>
130where
131 Node: FullNodeTypes<Types: NodeTypes<ChainSpec: EthereumHardforks>>,
132 V: TransactionValidator + 'static,
133 V::Transaction:
134 PoolTransaction<Consensus = TxTy<Node::Types>> + reth_transaction_pool::EthPoolTransaction,
135{
136 pub fn build<BS>(
139 self,
140 blob_store: BS,
141 pool_config: PoolConfig,
142 ) -> reth_transaction_pool::Pool<
143 TransactionValidationTaskExecutor<V>,
144 CoinbaseTipOrdering<V::Transaction>,
145 BS,
146 >
147 where
148 BS: BlobStore,
149 {
150 let TxPoolBuilder { validator, .. } = self;
151 reth_transaction_pool::Pool::new(
152 validator,
153 CoinbaseTipOrdering::default(),
154 blob_store,
155 pool_config,
156 )
157 }
158
159 pub fn build_and_spawn_maintenance_task<BS>(
162 self,
163 blob_store: BS,
164 pool_config: PoolConfig,
165 ) -> eyre::Result<
166 reth_transaction_pool::Pool<
167 TransactionValidationTaskExecutor<V>,
168 CoinbaseTipOrdering<V::Transaction>,
169 BS,
170 >,
171 >
172 where
173 BS: BlobStore,
174 {
175 let ctx = self.ctx;
176 let transaction_pool = self.build(blob_store, pool_config);
177 spawn_maintenance_tasks(ctx, transaction_pool.clone(), transaction_pool.config())?;
179
180 Ok(transaction_pool)
181 }
182}
183
184pub fn create_blob_store<Node: FullNodeTypes>(
186 ctx: &BuilderContext<Node>,
187) -> eyre::Result<DiskFileBlobStore> {
188 let cache_size = Some(ctx.config().txpool.max_cached_entries);
189 create_blob_store_with_cache(ctx, cache_size)
190}
191
192pub fn create_blob_store_with_cache<Node: FullNodeTypes>(
195 ctx: &BuilderContext<Node>,
196 cache_size: Option<u32>,
197) -> eyre::Result<DiskFileBlobStore> {
198 let data_dir = ctx.config().datadir();
199 let config = if let Some(cache_size) = cache_size {
200 reth_transaction_pool::blobstore::DiskFileBlobStoreConfig::default()
201 .with_max_cached_entries(cache_size)
202 } else {
203 Default::default()
204 };
205
206 Ok(reth_transaction_pool::blobstore::DiskFileBlobStore::open(data_dir.blobstore(), config)?)
207}
208
209fn spawn_local_backup_task<Node, Pool>(ctx: &BuilderContext<Node>, pool: Pool) -> eyre::Result<()>
211where
212 Node: FullNodeTypes,
213 Pool: TransactionPool + Clone + 'static,
214{
215 if !ctx.config().txpool.disable_transactions_backup {
216 let data_dir = ctx.config().datadir();
217 let transactions_path = ctx
218 .config()
219 .txpool
220 .transactions_backup_path
221 .clone()
222 .unwrap_or_else(|| data_dir.txpool_transactions());
223
224 let transactions_backup_config =
225 reth_transaction_pool::maintain::LocalTransactionBackupConfig::with_local_txs_backup(
226 transactions_path,
227 );
228
229 ctx.task_executor().spawn_critical_with_graceful_shutdown_signal(
230 "local transactions backup task",
231 |shutdown| {
232 reth_transaction_pool::maintain::backup_local_transactions_task(
233 shutdown,
234 pool,
235 transactions_backup_config,
236 )
237 },
238 );
239 }
240 Ok(())
241}
242
243fn spawn_pool_maintenance_task<Node, Pool>(
245 ctx: &BuilderContext<Node>,
246 pool: Pool,
247 pool_config: &PoolConfig,
248) -> eyre::Result<()>
249where
250 Node: FullNodeTypes<Types: NodeTypes<ChainSpec: EthereumHardforks>>,
251 Pool: reth_transaction_pool::TransactionPoolExt + Clone + 'static,
252 Pool::Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>,
253{
254 let chain_events = ctx.provider().canonical_state_stream();
255 let client = ctx.provider().clone();
256
257 ctx.task_executor().spawn_critical(
258 "txpool maintenance task",
259 reth_transaction_pool::maintain::maintain_transaction_pool_future(
260 client,
261 pool,
262 chain_events,
263 ctx.task_executor().clone(),
264 reth_transaction_pool::maintain::MaintainPoolConfig {
265 max_tx_lifetime: pool_config.max_queued_lifetime,
266 no_local_exemptions: pool_config.local_transactions_config.no_exemptions,
267 ..Default::default()
268 },
269 ),
270 );
271
272 Ok(())
273}
274
275pub fn spawn_maintenance_tasks<Node, Pool>(
277 ctx: &BuilderContext<Node>,
278 pool: Pool,
279 pool_config: &PoolConfig,
280) -> eyre::Result<()>
281where
282 Node: FullNodeTypes<Types: NodeTypes<ChainSpec: EthereumHardforks>>,
283 Pool: reth_transaction_pool::TransactionPoolExt + Clone + 'static,
284 Pool::Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>,
285{
286 spawn_local_backup_task(ctx, pool.clone())?;
287 spawn_pool_maintenance_task(ctx, pool, pool_config)?;
288 Ok(())
289}
290
291impl<Node: FullNodeTypes, V: std::fmt::Debug> std::fmt::Debug for TxPoolBuilder<'_, Node, V> {
292 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293 f.debug_struct("TxPoolBuilder").field("validator", &self.validator).finish()
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use reth_transaction_pool::PoolConfig;
301
302 #[test]
303 fn test_pool_builder_config_overrides_apply() {
304 let base_config = PoolConfig::default();
305 let overrides = PoolBuilderConfigOverrides {
306 pending_limit: Some(SubPoolLimit::default()),
307 max_account_slots: Some(100),
308 minimal_protocol_basefee: Some(1000),
309 ..Default::default()
310 };
311
312 let updated_config = overrides.apply(base_config);
313 assert_eq!(updated_config.max_account_slots, 100);
314 assert_eq!(updated_config.minimal_protocol_basefee, 1000);
315 }
316
317 #[test]
318 fn test_pool_builder_config_overrides_default() {
319 let overrides = PoolBuilderConfigOverrides::default();
320 assert!(overrides.pending_limit.is_none());
321 assert!(overrides.max_account_slots.is_none());
322 assert!(overrides.local_addresses.is_empty());
323 }
324}