1use alloy_primitives::{map::AddressMap, Address, BlockNumber, TxNumber, B256};
3use reth_config::config::EtlConfig;
4use reth_db_api::{
5 cursor::{DbCursorRO, DbCursorRW},
6 models::{
7 sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey,
8 AccountBeforeTx, AddressStorageKey, BlockNumberAddress, ShardedKey,
9 },
10 table::{Decode, Decompress, Table},
11 transaction::DbTx,
12 BlockNumberList,
13};
14use reth_etl::Collector;
15use reth_primitives_traits::NodePrimitives;
16use reth_provider::{
17 providers::StaticFileProvider, to_range, BlockReader, DBProvider, EitherWriter, ProviderError,
18 StaticFileProviderFactory,
19};
20use reth_stages_api::StageError;
21use reth_static_file_types::StaticFileSegment;
22use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
23use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
24use tracing::info;
25
26const DEFAULT_CACHE_THRESHOLD: u64 = 100_000;
28
29pub(crate) fn collect_history_indices<Provider, CS, H, P>(
47 provider: &Provider,
48 range: impl RangeBounds<CS::Key>,
49 sharded_key_factory: impl Fn(P, BlockNumber) -> H::Key,
50 partial_key_factory: impl Fn((CS::Key, CS::Value)) -> (u64, P),
51 etl_config: &EtlConfig,
52) -> Result<Collector<H::Key, H::Value>, StageError>
53where
54 Provider: DBProvider,
55 CS: Table,
56 H: Table<Value = BlockNumberList>,
57 P: Copy + Eq + Hash,
58{
59 let mut changeset_cursor = provider.tx_ref().cursor_read::<CS>()?;
60
61 let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
62 let mut cache: HashMap<P, Vec<u64>> = HashMap::default();
63
64 let mut collect = |cache: &mut HashMap<P, Vec<u64>>| {
65 for (key, indices) in cache.drain() {
66 let last = *indices.last().expect("qed");
67 collector
68 .insert(sharded_key_factory(key, last), BlockNumberList::new_pre_sorted(indices))?;
69 }
70 Ok::<(), StageError>(())
71 };
72
73 let total_changesets = provider.tx_ref().entries::<CS>()?;
75 let interval = (total_changesets / 1000).max(1);
76
77 let mut flush_counter = 0;
78 let mut current_block_number = u64::MAX;
79 for (idx, entry) in changeset_cursor.walk_range(range)?.enumerate() {
80 let (block_number, key) = partial_key_factory(entry?);
81 cache.entry(key).or_default().push(block_number);
82
83 if idx > 0 && idx.is_multiple_of(interval) && total_changesets > 1000 {
84 info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
85 }
86
87 if current_block_number != block_number {
89 current_block_number = block_number;
90 flush_counter += 1;
91 if flush_counter > DEFAULT_CACHE_THRESHOLD {
92 collect(&mut cache)?;
93 flush_counter = 0;
94 }
95 }
96 }
97 collect(&mut cache)?;
98
99 Ok(collector)
100}
101
102fn collect_indices<K, F>(
104 cache: impl Iterator<Item = (K, Vec<u64>)>,
105 mut insert_fn: F,
106) -> Result<(), StageError>
107where
108 F: FnMut(K, Vec<u64>) -> Result<(), StageError>,
109{
110 for (key, indices) in cache {
111 insert_fn(key, indices)?
112 }
113 Ok(())
114}
115
116pub(crate) fn collect_account_history_indices<Provider>(
118 provider: &Provider,
119 range: impl RangeBounds<BlockNumber>,
120 etl_config: &EtlConfig,
121) -> Result<Collector<ShardedKey<Address>, BlockNumberList>, StageError>
122where
123 Provider: DBProvider + ChangeSetReader + StaticFileProviderFactory,
124{
125 let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
126 let mut cache: AddressMap<Vec<u64>> = AddressMap::default();
127
128 let mut insert_fn = |address: Address, indices: Vec<u64>| {
129 let last = indices.last().expect("indices is non-empty");
130 collector
131 .insert(ShardedKey::new(address, *last), BlockNumberList::new_pre_sorted(indices))?;
132 Ok(())
133 };
134
135 let range = to_range(range);
137 let start_block = range.start;
138
139 let static_file_provider = provider.static_file_provider();
141
142 let walker = static_file_provider.walk_account_changeset_range(range);
143
144 let mut flush_counter = 0;
145 let mut current_block_number = u64::MAX;
146
147 for changeset_result in walker {
148 let (block_number, AccountBeforeTx { address, .. }) = changeset_result?;
149 cache.entry(address).or_default().push(block_number);
150
151 if block_number != current_block_number {
152 current_block_number = block_number;
153 flush_counter += 1;
154 }
155
156 if flush_counter > DEFAULT_CACHE_THRESHOLD {
157 info!(
158 target: "sync::stages::index_history",
159 processed_blocks = current_block_number.saturating_sub(start_block) + 1,
160 current_block = current_block_number,
161 "Collecting indices"
162 );
163 collect_indices(cache.drain(), &mut insert_fn)?;
164 flush_counter = 0;
165 }
166 }
167 collect_indices(cache.into_iter(), insert_fn)?;
168
169 Ok(collector)
170}
171
172pub(crate) fn collect_storage_history_indices<Provider>(
174 provider: &Provider,
175 range: impl RangeBounds<BlockNumber>,
176 etl_config: &EtlConfig,
177) -> Result<Collector<StorageShardedKey, BlockNumberList>, StageError>
178where
179 Provider: DBProvider + StorageChangeSetReader + StaticFileProviderFactory,
180{
181 let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
182 let mut cache: HashMap<AddressStorageKey, Vec<u64>> = HashMap::default();
183
184 let mut insert_fn = |key: AddressStorageKey, indices: Vec<u64>| {
185 let last = indices.last().expect("qed");
186 collector.insert(
187 StorageShardedKey::new(key.0 .0, key.0 .1, *last),
188 BlockNumberList::new_pre_sorted(indices),
189 )?;
190 Ok::<(), StageError>(())
191 };
192
193 let range = to_range(range);
194 let start_block = range.start;
195 let static_file_provider = provider.static_file_provider();
196
197 let walker = static_file_provider.walk_storage_changeset_range(range);
198
199 let mut flush_counter = 0;
200 let mut current_block_number = u64::MAX;
201
202 for changeset_result in walker {
203 let (BlockNumberAddress((block_number, address)), storage) = changeset_result?;
204 cache.entry(AddressStorageKey((address, storage.key))).or_default().push(block_number);
205
206 if block_number != current_block_number {
207 current_block_number = block_number;
208 flush_counter += 1;
209 }
210
211 if flush_counter > DEFAULT_CACHE_THRESHOLD {
212 info!(
213 target: "sync::stages::index_history",
214 processed_blocks = current_block_number.saturating_sub(start_block) + 1,
215 current_block = current_block_number,
216 "Collecting indices"
217 );
218 collect_indices(cache.drain(), &mut insert_fn)?;
219 flush_counter = 0;
220 }
221 }
222
223 collect_indices(cache.into_iter(), insert_fn)?;
224
225 Ok(collector)
226}
227
228pub(crate) fn load_account_history<N, CURSOR>(
240 mut collector: Collector<ShardedKey<Address>, BlockNumberList>,
241 append_only: bool,
242 writer: &mut EitherWriter<'_, CURSOR, N>,
243) -> Result<(), StageError>
244where
245 N: NodePrimitives,
246 CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
247 + DbCursorRO<reth_db_api::tables::AccountsHistory>,
248{
249 let mut current_address: Option<Address> = None;
250 let mut current_list = Vec::<u64>::new();
252
253 let total_entries = collector.len();
254 let interval = (total_entries / 10).max(1);
255
256 for (index, element) in collector.iter()?.enumerate() {
257 let (k, v) = element?;
258 let sharded_key = ShardedKey::<Address>::decode_owned(k)?;
259 let new_list = BlockNumberList::decompress_owned(v)?;
260
261 if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
262 info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
263 }
264
265 let address = sharded_key.key;
266
267 if current_address != Some(address) {
269 if let Some(prev_addr) = current_address {
271 flush_account_history_shards(prev_addr, &mut current_list, append_only, writer)?;
272 }
273
274 current_address = Some(address);
275 current_list.clear();
276
277 if !append_only &&
280 let Some(last_shard) = writer.get_last_account_history_shard(address)?
281 {
282 current_list.extend(last_shard.iter());
283 }
284 }
285
286 current_list.extend(new_list.iter());
288
289 flush_account_history_shards_partial(address, &mut current_list, append_only, writer)?;
291 }
292
293 if let Some(addr) = current_address {
295 flush_account_history_shards(addr, &mut current_list, append_only, writer)?;
296 }
297
298 Ok(())
299}
300
301fn flush_account_history_shards_partial<N, CURSOR>(
307 address: Address,
308 list: &mut Vec<u64>,
309 append_only: bool,
310 writer: &mut EitherWriter<'_, CURSOR, N>,
311) -> Result<(), StageError>
312where
313 N: NodePrimitives,
314 CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
315 + DbCursorRO<reth_db_api::tables::AccountsHistory>,
316{
317 if list.len() <= NUM_OF_INDICES_IN_SHARD {
319 return Ok(());
320 }
321
322 let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
323
324 let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
327 num_full_shards - 1
328 } else {
329 num_full_shards
330 };
331
332 if shards_to_flush == 0 {
333 return Ok(());
334 }
335
336 let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
338 let remainder = list.split_off(flush_len);
339
340 for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) {
342 let highest = *chunk.last().expect("chunk is non-empty");
343 let key = ShardedKey::new(address, highest);
344 let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
345
346 if append_only {
347 writer.append_account_history(key, &value)?;
348 } else {
349 writer.upsert_account_history(key, &value)?;
350 }
351 }
352
353 *list = remainder;
355 Ok(())
356}
357
358fn flush_account_history_shards<N, CURSOR>(
363 address: Address,
364 list: &mut Vec<u64>,
365 append_only: bool,
366 writer: &mut EitherWriter<'_, CURSOR, N>,
367) -> Result<(), StageError>
368where
369 N: NodePrimitives,
370 CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
371 + DbCursorRO<reth_db_api::tables::AccountsHistory>,
372{
373 if list.is_empty() {
374 return Ok(());
375 }
376
377 let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
378
379 for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
380 let is_last = i == num_chunks - 1;
381
382 let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
385
386 let key = ShardedKey::new(address, highest);
387 let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
388
389 if append_only {
390 writer.append_account_history(key, &value)?;
391 } else {
392 writer.upsert_account_history(key, &value)?;
393 }
394 }
395
396 list.clear();
397 Ok(())
398}
399
400pub(crate) fn missing_static_data_error<Provider>(
403 last_tx_num: TxNumber,
404 static_file_provider: &StaticFileProvider<Provider::Primitives>,
405 provider: &Provider,
406 segment: StaticFileSegment,
407) -> Result<StageError, ProviderError>
408where
409 Provider: BlockReader + StaticFileProviderFactory,
410{
411 let mut last_block =
412 static_file_provider.get_highest_static_file_block(segment).unwrap_or_default();
413
414 loop {
417 if let Some(indices) = provider.block_body_indices(last_block)? &&
418 indices.last_tx_num() <= last_tx_num
419 {
420 break
421 }
422 if last_block == 0 {
423 break
424 }
425 last_block -= 1;
426 }
427
428 let missing_block = Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default());
429
430 Ok(StageError::MissingStaticFileData {
431 block: Box::new(missing_block.block_with_parent()),
432 segment,
433 })
434}
435
436pub(crate) fn load_storage_history<N, CURSOR>(
448 mut collector: Collector<StorageShardedKey, BlockNumberList>,
449 append_only: bool,
450 writer: &mut EitherWriter<'_, CURSOR, N>,
451) -> Result<(), StageError>
452where
453 N: NodePrimitives,
454 CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
455 + DbCursorRO<reth_db_api::tables::StoragesHistory>,
456{
457 let mut current_key: Option<(Address, B256)> = None;
458 let mut current_list = Vec::<u64>::new();
460
461 let total_entries = collector.len();
462 let interval = (total_entries / 10).max(1);
463
464 for (index, element) in collector.iter()?.enumerate() {
465 let (k, v) = element?;
466 let sharded_key = StorageShardedKey::decode_owned(k)?;
467 let new_list = BlockNumberList::decompress_owned(v)?;
468
469 if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
470 info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
471 }
472
473 let partial_key = (sharded_key.address, sharded_key.sharded_key.key);
474
475 if current_key != Some(partial_key) {
477 if let Some((prev_addr, prev_storage_key)) = current_key {
479 flush_storage_history_shards(
480 prev_addr,
481 prev_storage_key,
482 &mut current_list,
483 append_only,
484 writer,
485 )?;
486 }
487
488 current_key = Some(partial_key);
489 current_list.clear();
490
491 if !append_only &&
494 let Some(last_shard) =
495 writer.get_last_storage_history_shard(partial_key.0, partial_key.1)?
496 {
497 current_list.extend(last_shard.iter());
498 }
499 }
500
501 current_list.extend(new_list.iter());
503
504 flush_storage_history_shards_partial(
506 partial_key.0,
507 partial_key.1,
508 &mut current_list,
509 append_only,
510 writer,
511 )?;
512 }
513
514 if let Some((addr, storage_key)) = current_key {
516 flush_storage_history_shards(addr, storage_key, &mut current_list, append_only, writer)?;
517 }
518
519 Ok(())
520}
521
522fn flush_storage_history_shards_partial<N, CURSOR>(
528 address: Address,
529 storage_key: B256,
530 list: &mut Vec<u64>,
531 append_only: bool,
532 writer: &mut EitherWriter<'_, CURSOR, N>,
533) -> Result<(), StageError>
534where
535 N: NodePrimitives,
536 CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
537 + DbCursorRO<reth_db_api::tables::StoragesHistory>,
538{
539 if list.len() <= NUM_OF_INDICES_IN_SHARD {
541 return Ok(());
542 }
543
544 let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
545
546 let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
549 num_full_shards - 1
550 } else {
551 num_full_shards
552 };
553
554 if shards_to_flush == 0 {
555 return Ok(());
556 }
557
558 let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
560 let remainder = list.split_off(flush_len);
561
562 for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) {
564 let highest = *chunk.last().expect("chunk is non-empty");
565 let key = StorageShardedKey::new(address, storage_key, highest);
566 let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
567
568 if append_only {
569 writer.append_storage_history(key, &value)?;
570 } else {
571 writer.upsert_storage_history(key, &value)?;
572 }
573 }
574
575 *list = remainder;
577 Ok(())
578}
579
580fn flush_storage_history_shards<N, CURSOR>(
586 address: Address,
587 storage_key: B256,
588 list: &mut Vec<u64>,
589 append_only: bool,
590 writer: &mut EitherWriter<'_, CURSOR, N>,
591) -> Result<(), StageError>
592where
593 N: NodePrimitives,
594 CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
595 + DbCursorRO<reth_db_api::tables::StoragesHistory>,
596{
597 if list.is_empty() {
598 return Ok(());
599 }
600
601 let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
602
603 for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
604 let is_last = i == num_chunks - 1;
605
606 let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
609
610 let key = StorageShardedKey::new(address, storage_key, highest);
611 let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
612
613 if append_only {
614 writer.append_storage_history(key, &value)?;
615 } else {
616 writer.upsert_storage_history(key, &value)?;
617 }
618 }
619
620 list.clear();
621 Ok(())
622}